Skip to content

Commit 02abe6b

Browse files
garyrussellartembilan
authored andcommitted
GH-2528: DLPR - Support Header Replacement (#2529)
Resolves #2528 When adding headers in a DLPR headers function, header values accumulate because Kafka headers support multiple values. Provide a mechanism to allow adding a header to replace any existing header with that name. **cherry-pick to 2.9.x** * Fix javadoc.
1 parent f69c257 commit 02abe6b

File tree

3 files changed

+78
-5
lines changed

3 files changed

+78
-5
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,9 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer)
601601

602602
Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })`
603603

604+
By default, any headers added will be cumulative - Kafka headers can contain multiple values.
605+
Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain.
606+
604607
[[retry-topic-combine-blocking]]
605608
==== Combining Blocking and Non-Blocking Retries
606609

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

+46-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,7 +39,9 @@
3939
import org.apache.kafka.clients.producer.ProducerRecord;
4040
import org.apache.kafka.common.PartitionInfo;
4141
import org.apache.kafka.common.TopicPartition;
42+
import org.apache.kafka.common.header.Header;
4243
import org.apache.kafka.common.header.Headers;
44+
import org.apache.kafka.common.header.internals.RecordHeader;
4345
import org.apache.kafka.common.header.internals.RecordHeaders;
4446

4547
import org.springframework.core.log.LogAccessor;
@@ -226,7 +228,9 @@ public void setRetainExceptionHeader(boolean retainExceptionHeader) {
226228

227229
/**
228230
* Set a function which will be called to obtain additional headers to add to the
229-
* published record.
231+
* published record. If a {@link Header} returned is an instance of
232+
* {@link SingleRecordHeader}, then that header will replace any existing header of
233+
* that name, rather than being appended as a new value.
230234
* @param headersFunction the headers function.
231235
* @since 2.5.4
232236
* @see #addHeadersFunction(BiFunction)
@@ -426,7 +430,10 @@ public void includeHeader(HeaderNames.HeadersToAdd... headers) {
426430
/**
427431
* Add a function which will be called to obtain additional headers to add to the
428432
* published record. Functions are called in the order that they are added, and after
429-
* any function passed into {@link #setHeadersFunction(BiFunction)}.
433+
* any function passed into {@link #setHeadersFunction(BiFunction)}. If a
434+
* {@link Header} returned is an instance of {@link SingleRecordHeader}, then that
435+
* header will replace any existing header of that name, rather than being appended as
436+
* a new value.
430437
* @param headersFunction the headers function.
431438
* @since 2.8.4
432439
* @see #setHeadersFunction(BiFunction)
@@ -722,7 +729,12 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, E
722729
maybeAddOriginalHeaders(kafkaHeaders, record, exception);
723730
Headers headers = this.headersFunction.apply(record, exception);
724731
if (headers != null) {
725-
headers.forEach(kafkaHeaders::add);
732+
headers.forEach(header -> {
733+
if (header instanceof SingleRecordHeader) {
734+
kafkaHeaders.remove(header.key());
735+
}
736+
kafkaHeaders.add(header);
737+
});
726738
}
727739
}
728740

@@ -1389,4 +1401,34 @@ public interface ExceptionHeadersCreator {
13891401

13901402
}
13911403

1404+
/**
1405+
* A {@link Header} that indicates that this header should replace any existing headers
1406+
* with this name, rather than being appended to the headers, which is the normal behavior.
1407+
*
1408+
* @since 2.9.5
1409+
* @see DeadLetterPublishingRecoverer#setHeadersFunction(BiFunction)
1410+
* @see DeadLetterPublishingRecoverer#addHeadersFunction(BiFunction)
1411+
*/
1412+
public static class SingleRecordHeader extends RecordHeader {
1413+
1414+
/**
1415+
* Construct an instance.
1416+
* @param key the key.
1417+
* @param value the value.
1418+
*/
1419+
public SingleRecordHeader(String key, byte[] value) {
1420+
super(key, value);
1421+
}
1422+
1423+
/**
1424+
* Construct an instance.
1425+
* @param keyBuffer the key buffer.
1426+
* @param valueBuffer the value buffer.
1427+
*/
1428+
public SingleRecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
1429+
super(keyBuffer, valueBuffer);
1430+
}
1431+
1432+
}
1433+
13921434
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,6 +67,7 @@
6767
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
6868
import org.springframework.kafka.core.ProducerFactory;
6969
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
70+
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader;
7071
import org.springframework.kafka.support.KafkaHeaders;
7172
import org.springframework.kafka.support.SendResult;
7273
import org.springframework.kafka.support.serializer.DeserializationException;
@@ -878,6 +879,33 @@ void immutableHeaders() {
878879
assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12);
879880
}
880881

882+
@SuppressWarnings({ "unchecked", "rawtypes" })
883+
@Test
884+
void replaceNotAppendHeader() {
885+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
886+
CompletableFuture future = mock(CompletableFuture.class);
887+
given(template.send(any(ProducerRecord.class))).willReturn(future);
888+
Headers headers = new RecordHeaders().add(new RecordHeader("foo", "orig".getBytes()));
889+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE,
890+
-1, -1, null, "bar", headers, Optional.empty());
891+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
892+
recoverer.setHeadersFunction((rec, ex) -> {
893+
RecordHeaders toReplace = new RecordHeaders(
894+
new RecordHeader[] { new SingleRecordHeader("foo", "one".getBytes()) });
895+
return toReplace;
896+
});
897+
recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException()));
898+
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
899+
verify(template).send(producerRecordCaptor.capture());
900+
ProducerRecord outRecord = producerRecordCaptor.getValue();
901+
Headers outHeaders = outRecord.headers();
902+
assertThat(KafkaTestUtils.getPropertyValue(outHeaders, "headers", List.class)).hasSize(11);
903+
Iterator<Header> iterator = outHeaders.headers("foo").iterator();
904+
assertThat(iterator.hasNext()).isTrue();
905+
assertThat(iterator.next().value()).isEqualTo("one".getBytes());
906+
assertThat(iterator.hasNext()).isFalse();
907+
}
908+
881909
@SuppressWarnings("unchecked")
882910
@Test
883911
void nonCompliantProducerFactory() throws Exception {

0 commit comments

Comments
 (0)