diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 6d2f7aa1d9..671c871aa5 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -5641,7 +5641,7 @@ Key exceptions are only caused by `DeserializationException` s so there is no `D There are two mechanisms to add more headers. 1. Subclass the recoverer and override `createProducerRecord()` - call `super.createProducerRecord()` and add more headers. -2. Provide a `BiFunction` to receive the consumer record and exception, returning a `Headers` object; headers from there will be copied to the final producer record. +2. Provide a `BiFunction` to receive the consumer record and exception, returning a `Headers` object; headers from there will be copied to the final producer record; also see <>. Use `setHeadersFunction()` to set the `BiFunction`. The second is simpler to implement but the first has more information available, including the already assembled standard headers. @@ -5736,6 +5736,34 @@ The reason for the two properties is because, while you might want to retain onl `appendOriginalHeaders` is applied to all headers named `*ORIGINAL*` while `stripPreviousExceptionHeaders` is applied to all headers named `*EXCEPTION*`. +Starting with version 2.8.4, you now can control which of the standard headers will be added to the output record. +See the `enum HeadersToAdd` for the generic names of the (currently) 10 standard headers that are added by default (these are not the actual header names, just an abstraction; the actual header names are set up by the `getHeaderNames()` method which subclasses can override. + +To exclude headers, use the `excludeHeaders()` method; for example, to suppress adding the exception stack trace in a header, use: + +==== +[source, java] +---- +DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); +recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE); +---- +==== + +In addition, you can completely customize the addition of exception headers by adding an `ExceptionHeadersCreator`; this also disables all standard exception headers. + +==== +[source, java] +---- +DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); +recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> { + kafkaHeaders.add(new RecordHeader(..., ...)); +}); +---- +==== + +Also starting with version 2.8.4, you can now provide multiple headers functions, via the `addHeadersFunction` method. +This allows additional functions to apply, even if another function has already been registered, for example, when using <>. + Also see <> with <>. [[exp-backoff]] diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 0d52b18f38..e20d5f859c 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -433,6 +433,8 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) ---- ==== +Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })` + [[retry-topic-combine-blocking]] ==== Combining blocking and non-blocking retries diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index c9aa9c29bc..444132b430 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -83,6 +83,8 @@ See <> for more information. The property `stripPreviousExceptionHeaders` is now `true` by default. +There are now several techniques to customize which headers are added to the output record. + See <> for more information. [[x28-retryable-topics-changes]] diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java index 3542eb12cf..74744b12eb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -66,6 +67,9 @@ */ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implements ConsumerAwareRecordRecoverer { + private static final BiFunction, Exception, Headers> DEFAULT_HEADERS_FUNCTION = + (rec, ex) -> null; + protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR private static final BiFunction, Exception, TopicPartition> @@ -83,9 +87,11 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement private final Function, KafkaOperations> templateResolver; + private final EnumSet whichHeaders = EnumSet.allOf(HeaderNames.HeadersToAdd.class); + private boolean retainExceptionHeader; - private BiFunction, Exception, Headers> headersFunction = (rec, ex) -> null; + private BiFunction, Exception, Headers> headersFunction = DEFAULT_HEADERS_FUNCTION; private boolean verifyPartition = true; @@ -105,6 +111,8 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement private boolean skipSameTopicFatalExceptions = true; + private ExceptionHeadersCreator exceptionHeadersCreator = this::addExceptionInfoHeaders; + /** * Create an instance with the provided template and a default destination resolving * function that returns a TopicPartition based on the original topic (appended with ".DLT") @@ -219,9 +227,14 @@ public void setRetainExceptionHeader(boolean retainExceptionHeader) { * published record. * @param headersFunction the headers function. * @since 2.5.4 + * @see #addHeadersFunction(BiFunction) */ public void setHeadersFunction(BiFunction, Exception, Headers> headersFunction) { Assert.notNull(headersFunction, "'headersFunction' cannot be null"); + if (!this.headersFunction.equals(DEFAULT_HEADERS_FUNCTION)) { + this.logger.warn(() -> "Replacing custom headers function: " + this.headersFunction + + ", consider using addHeadersFunction() if you need multiple functions"); + } this.headersFunction = headersFunction; } @@ -326,6 +339,79 @@ public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions; } + /** + * Set a {@link ExceptionHeadersCreator} implementation to completely take over + * setting the exception headers in the output record. Disables all headers that are + * set by default. + * @param headersCreator the creator. + * @since 2.8.4 + */ + public void setExceptionHeadersCreator(ExceptionHeadersCreator headersCreator) { + Assert.notNull(headersCreator, "'headersCreator' cannot be null"); + this.exceptionHeadersCreator = headersCreator; + } + + /** + * Clear the header inclusion bit for the header name. + * @param headers the headers to clear. + * @since 2.8.4 + */ + public void excludeHeader(HeaderNames.HeadersToAdd... headers) { + Assert.notNull(headers, "'headers' cannot be null"); + Assert.noNullElements(headers, "'headers' cannot include null elements"); + for (HeaderNames.HeadersToAdd header : headers) { + this.whichHeaders.remove(header); + } + } + + /** + * Set the header inclusion bit for the header name. + * @param headers the headers to set. + * @since 2.8.4 + */ + public void includeHeader(HeaderNames.HeadersToAdd... headers) { + Assert.notNull(headers, "'headers' cannot be null"); + Assert.noNullElements(headers, "'headers' cannot include null elements"); + for (HeaderNames.HeadersToAdd header : headers) { + this.whichHeaders.add(header); + } + } + + /** + * Add a function which will be called to obtain additional headers to add to the + * published record. Functions are called in the order that they are added, and after + * any function passed into {@link #setHeadersFunction(BiFunction)}. + * @param headersFunction the headers function. + * @since 2.8.4 + * @see #setHeadersFunction(BiFunction) + */ + public void addHeadersFunction(BiFunction, Exception, Headers> headersFunction) { + Assert.notNull(headersFunction, "'headersFunction' cannot be null"); + if (this.headersFunction.equals(DEFAULT_HEADERS_FUNCTION)) { + this.headersFunction = headersFunction; + } + else { + BiFunction, Exception, Headers> toCompose = this.headersFunction; + this.headersFunction = (rec, ex) -> { + Headers headers1 = toCompose.apply(rec, ex); + if (headers1 == null) { + headers1 = new RecordHeaders(); + } + Headers headers2 = headersFunction.apply(rec, ex); + try { + if (headers2 != null) { + headers2.forEach(headers1::add); + } + } + catch (IllegalStateException isex) { + headers1 = new RecordHeaders(headers1); + headers2.forEach(headers1::add); // NOSONAR, never null here + } + return headers1; + }; + } + } + @SuppressWarnings("unchecked") @Override public void accept(ConsumerRecord record, @Nullable Consumer consumer, Exception exception) { @@ -367,16 +453,16 @@ private void addAndEnhanceHeaders(ConsumerRecord record, Exception excepti if (!this.retainExceptionHeader) { headers.remove(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER); } - addExceptionInfoHeaders(headers, kDeserEx, true); + this.exceptionHeadersCreator.create(headers, kDeserEx, true, this.headerNames); } if (vDeserEx != null) { if (!this.retainExceptionHeader) { headers.remove(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER); } - addExceptionInfoHeaders(headers, vDeserEx, false); + this.exceptionHeadersCreator.create(headers, vDeserEx, false, this.headerNames); } if (kDeserEx == null && vDeserEx == null) { - addExceptionInfoHeaders(headers, exception, false); + this.exceptionHeadersCreator.create(headers, exception, false, this.headerNames); } enhanceHeaders(headers, record, exception); // NOSONAR headers are never null } @@ -573,56 +659,64 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord record, E private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord record, Exception ex) { maybeAddHeader(kafkaHeaders, this.headerNames.original.topicHeader, - record.topic().getBytes(StandardCharsets.UTF_8)); + record.topic().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TOPIC); maybeAddHeader(kafkaHeaders, this.headerNames.original.partitionHeader, - ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array()); + ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array(), + HeaderNames.HeadersToAdd.PARTITION); maybeAddHeader(kafkaHeaders, this.headerNames.original.offsetHeader, - ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array()); + ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array(), HeaderNames.HeadersToAdd.OFFSET); maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampHeader, - ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array()); + ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array(), HeaderNames.HeadersToAdd.TS); maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampTypeHeader, - record.timestampType().toString().getBytes(StandardCharsets.UTF_8)); + record.timestampType().toString().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TS_TYPE); if (ex instanceof ListenerExecutionFailedException) { String consumerGroup = ((ListenerExecutionFailedException) ex).getGroupId(); if (consumerGroup != null) { maybeAddHeader(kafkaHeaders, this.headerNames.original.consumerGroup, - consumerGroup.getBytes(StandardCharsets.UTF_8)); + consumerGroup.getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.GROUP); } } } - private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value) { - if (this.appendOriginalHeaders || kafkaHeaders.lastHeader(header) == null) { + private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value, HeaderNames.HeadersToAdd hta) { + if (this.whichHeaders.contains(hta) + && (this.appendOriginalHeaders || kafkaHeaders.lastHeader(header) == null)) { kafkaHeaders.add(header, value); } } - void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey) { - appendOrReplace(kafkaHeaders, new RecordHeader(isKey ? this.headerNames.exceptionInfo.keyExceptionFqcn - : this.headerNames.exceptionInfo.exceptionFqcn, - exception.getClass().getName().getBytes(StandardCharsets.UTF_8))); - if (!isKey && exception.getCause() != null) { - appendOrReplace(kafkaHeaders, new RecordHeader(this.headerNames.exceptionInfo.exceptionCauseFqcn, - exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8))); + private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey, + HeaderNames names) { + + appendOrReplace(kafkaHeaders, new RecordHeader(isKey ? names.exceptionInfo.keyExceptionFqcn + : names.exceptionInfo.exceptionFqcn, + exception.getClass().getName().getBytes(StandardCharsets.UTF_8)), HeaderNames.HeadersToAdd.EXCEPTION); + if (exception.getCause() != null) { + appendOrReplace(kafkaHeaders, new RecordHeader(names.exceptionInfo.exceptionCauseFqcn, + exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8)), + HeaderNames.HeadersToAdd.EX_CAUSE); } String message = exception.getMessage(); if (message != null) { appendOrReplace(kafkaHeaders, new RecordHeader(isKey - ? this.headerNames.exceptionInfo.keyExceptionMessage - : this.headerNames.exceptionInfo.exceptionMessage, - exception.getMessage().getBytes(StandardCharsets.UTF_8))); + ? names.exceptionInfo.keyExceptionMessage + : names.exceptionInfo.exceptionMessage, + exception.getMessage().getBytes(StandardCharsets.UTF_8)), HeaderNames.HeadersToAdd.EX_MSG); } appendOrReplace(kafkaHeaders, new RecordHeader(isKey - ? this.headerNames.exceptionInfo.keyExceptionStacktrace - : this.headerNames.exceptionInfo.exceptionStacktrace, - getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8))); + ? names.exceptionInfo.keyExceptionStacktrace + : names.exceptionInfo.exceptionStacktrace, + getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)), + HeaderNames.HeadersToAdd.EX_STACKTRACE); } - private void appendOrReplace(Headers headers, RecordHeader header) { - if (this.stripPreviousExceptionHeaders) { - headers.remove(header.key()); + private void appendOrReplace(Headers headers, RecordHeader header, HeaderNames.HeadersToAdd hta) { + if (this.whichHeaders.contains(hta)) { + if (this.stripPreviousExceptionHeaders) { + headers.remove(header.key()); + } + headers.add(header); } - headers.add(header); } private String getStackTraceAsString(Throwable cause) { @@ -665,7 +759,66 @@ protected HeaderNames getHeaderNames() { */ public static class HeaderNames { + /** + * Bits representing which headers to add. + * @since 2.8.4 + */ + public enum HeadersToAdd { + + /** + * The offset of the failed record. + */ + OFFSET, + + /** + * The timestamp of the failed record. + */ + TS, + + /** + * The timestamp type of the failed record. + */ + TS_TYPE, + + /** + * The original topic of the failed record. + */ + TOPIC, + + /** + * The partition from which the failed record was received. + */ + PARTITION, + + /** + * The consumer group that received the failed record. + */ + GROUP, + + /** + * The exception class name. + */ + EXCEPTION, + + /** + * The exception cause class name. + */ + EX_CAUSE, + + /** + * The exception message. + */ + EX_MSG, + + /** + * The exception stack trace. + */ + EX_STACKTRACE; + + } + private final HeaderNames.Original original; + private final ExceptionInfo exceptionInfo; HeaderNames(HeaderNames.Original original, ExceptionInfo exceptionInfo) { @@ -673,7 +826,30 @@ public static class HeaderNames { this.exceptionInfo = exceptionInfo; } - static class Original { + /** + * The header names for the original record headers. + * @return the original. + * @since 2.8.4 + */ + public HeaderNames.Original getOriginal() { + return this.original; + } + + /** + * The header names for the exception headers. + * @return the exceptionInfo + * @since 2.8.4 + */ + public ExceptionInfo getExceptionInfo() { + return this.exceptionInfo; + } + + /** + * Header names for original record property headers. + * + * @since 2.8.4 + */ + public static class Original { final String offsetHeader; // NOSONAR @@ -700,9 +876,63 @@ static class Original { this.partitionHeader = partitionHeader; this.consumerGroup = consumerGroup; } + + /** + * The header name for the offset. + * @return the offsetHeader. + */ + public String getOffsetHeader() { + return this.offsetHeader; + } + + /** + * The header name for the timestamp. + * @return the timestampHeader. + */ + public String getTimestampHeader() { + return this.timestampHeader; + } + + /** + * The header name for the timestamp type. + * @return the timestampTypeHeader. + */ + public String getTimestampTypeHeader() { + return this.timestampTypeHeader; + } + + /** + * The header name for the topic. + * @return the topicHeader. + */ + public String getTopicHeader() { + return this.topicHeader; + } + + /** + * The header name for the partition. + * @return the partitionHeader + */ + public String getPartitionHeader() { + return this.partitionHeader; + } + + /** + * The header name for the consumer group. + * @return the consumerGroup + */ + public String getConsumerGroup() { + return this.consumerGroup; + } + } - static class ExceptionInfo { + /** + * Header names for exception headers. + * + * @since 2.8.4 + */ + public static class ExceptionInfo { final String keyExceptionFqcn; // NOSONAR @@ -733,6 +963,63 @@ static class ExceptionInfo { this.keyExceptionStacktrace = keyExceptionStacktrace; this.exceptionStacktrace = exceptionStacktrace; } + + /** + * The header name for the key exception class. + * @return the keyExceptionFqcn. + */ + public String getKeyExceptionFqcn() { + return this.keyExceptionFqcn; + } + + /** + * The header name for the value exception class. + * @return the exceptionFqcn. + */ + public String getExceptionFqcn() { + return this.exceptionFqcn; + } + + /** + * The header name for the exception cause. + * @return the exceptionCauseFqcn. + */ + public String getExceptionCauseFqcn() { + return this.exceptionCauseFqcn; + } + + /** + * The header name for the key exception message. + * @return the keyExceptionMessage. + */ + public String getKeyExceptionMessage() { + return this.keyExceptionMessage; + } + + /** + * The header name for the exception message. + * @return the exceptionMessage. + */ + public String getExceptionMessage() { + return this.exceptionMessage; + } + + /** + * The header name for the key exception stack trace. + * @return the keyExceptionStacktrace + */ + public String getKeyExceptionStacktrace() { + return this.keyExceptionStacktrace; + } + + /** + * The header name for the exception stack trace. + * @return the exceptionStacktrace + */ + public String getExceptionStacktrace() { + return this.exceptionStacktrace; + } + } /** @@ -1004,5 +1291,26 @@ public DeadLetterPublishingRecoverer.HeaderNames build() { } } } + } + + /** + * Use this to provide a custom implementation to take complete control over exception + * header creation for the output record. + * + * @since 2.8.4 + */ + public interface ExceptionHeadersCreator { + + /** + * Create exception headers. + * @param kafkaHeaders the {@link Headers} to add the header(s) to. + * @param exception The exception. + * @param isKey whether the exception is for a key or value. + * @param headerNames the heaader names to use. + */ + void create(Headers kafkaHeaders, Exception exception, boolean isKey, HeaderNames headerNames); + + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java index fa3645558b..c41947a237 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Consumer; import org.apache.commons.logging.LogFactory; @@ -63,10 +64,22 @@ public class DeadLetterPublishingRecovererFactory { private Consumer recovererCustomizer = recoverer -> { }; + private BiFunction, Exception, Headers> headersFunction; + public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) { this.destinationTopicResolver = destinationTopicResolver; } + /** + * Set a function that creates additional headers for the output record, in addition to the standard + * retry headers added by this factory. + * @param headersFunction the function. + * @since 2.8.4 + */ + public void setHeadersFunction(BiFunction, Exception, Headers> headersFunction) { + this.headersFunction = headersFunction; + } + /** * Add exception type to the default list. By default, the following exceptions will * not be retried: @@ -137,6 +150,9 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() { }; recoverer.setHeadersFunction((consumerRecord, e) -> addHeaders(consumerRecord, e, getAttempts(consumerRecord))); + if (this.headersFunction != null) { + recoverer.addHeadersFunction(this.headersFunction); + } recoverer.setFailIfSendResultIsError(true); recoverer.setAppendOriginalHeaders(false); recoverer.setThrowIfNoDestinationReturned(false); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java index 056285ecd4..8e6d96ec23 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java @@ -25,6 +25,7 @@ import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -40,6 +41,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -63,10 +65,12 @@ import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaOperations.OperationsCallback; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.support.serializer.SerializationUtils; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.SettableListenableFuture; @@ -551,6 +555,8 @@ void notThrowIfNoDestinationReturnedByDefault() { @Test void noCircularRoutingIfFatal() { KafkaOperations template = mock(KafkaOperations.class); + ListenableFuture future = mock(ListenableFuture.class); + given(template.send(any(ProducerRecord.class))).willReturn(future); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, (cr, e) -> new TopicPartition("foo", 0)); @@ -586,4 +592,288 @@ void doNotSkipCircularFatalIfSet() { verify(template, times(3)).send(any(ProducerRecord.class)); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void headerBitsTurnedOffOneByOne() { + KafkaOperations template = mock(KafkaOperations.class); + ListenableFuture future = mock(ListenableFuture.class); + given(template.send(any(ProducerRecord.class))).willReturn(future); + ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + ArgumentCaptor producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(template).send(producerRecordCaptor.capture()); + ProducerRecord outRecord = producerRecordCaptor.getValue(); + Headers headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(10); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader(HeaderNames.HeadersToAdd.TOPIC); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(9); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader((HeaderNames.HeadersToAdd.PARTITION)); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(8); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader((HeaderNames.HeadersToAdd.OFFSET)); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(7); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader((HeaderNames.HeadersToAdd.TS)); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(6); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader((HeaderNames.HeadersToAdd.TS_TYPE)); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(5); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader((HeaderNames.HeadersToAdd.GROUP)); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(4); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader((HeaderNames.HeadersToAdd.EXCEPTION)); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(3); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader((HeaderNames.HeadersToAdd.EX_CAUSE)); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(2); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader((HeaderNames.HeadersToAdd.EX_MSG)); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(1); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + + recoverer.excludeHeader(HeaderNames.HeadersToAdd.EX_STACKTRACE); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + outRecord = producerRecordCaptor.getValue(); + headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(0); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNull(); + + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + void headerCreator() { + KafkaOperations template = mock(KafkaOperations.class); + ListenableFuture future = mock(ListenableFuture.class); + given(template.send(any(ProducerRecord.class))).willReturn(future); + ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); + recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> { + kafkaHeaders.add(new RecordHeader("foo", "bar".getBytes())); + }); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + ArgumentCaptor producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(template, atLeastOnce()).send(producerRecordCaptor.capture()); + ProducerRecord outRecord = producerRecordCaptor.getValue(); + Headers headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(7); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNotNull(); + assertThat(headers.lastHeader("foo")).isNotNull(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + void addHeaderFunctionsProcessedInOrder() { + KafkaOperations template = mock(KafkaOperations.class); + ListenableFuture future = mock(ListenableFuture.class); + given(template.send(any(ProducerRecord.class))).willReturn(future); + ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); + recoverer.setHeadersFunction((rec, ex) -> { + return new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "one".getBytes()) }); + }); + recoverer.addHeadersFunction((rec, ex) -> { + return new RecordHeaders(new RecordHeader[] { new RecordHeader("bar", "two".getBytes()) }); + }); + recoverer.addHeadersFunction((rec, ex) -> { + return new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "three".getBytes()) }); + }); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + ArgumentCaptor producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(template).send(producerRecordCaptor.capture()); + ProducerRecord outRecord = producerRecordCaptor.getValue(); + Headers headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(13); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotNull(); + assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + assertThat(headers.headers("foo")).extracting("value").containsExactly("one".getBytes(), "three".getBytes()); + assertThat(headers.lastHeader("bar")).extracting("value").isEqualTo("two".getBytes()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + void immutableHeaders() { + KafkaOperations template = mock(KafkaOperations.class); + ListenableFuture future = mock(ListenableFuture.class); + given(template.send(any(ProducerRecord.class))).willReturn(future); + ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", null); + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); + recoverer.setHeadersFunction((rec, ex) -> { + RecordHeaders headers = new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "one".getBytes()) }); + headers.setReadOnly(); + return headers; + }); + recoverer.addHeadersFunction((rec, ex) -> { + return new RecordHeaders(new RecordHeader[] { new RecordHeader("bar", "two".getBytes()) }); + }); + recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException())); + ArgumentCaptor producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(template).send(producerRecordCaptor.capture()); + ProducerRecord outRecord = producerRecordCaptor.getValue(); + Headers headers = outRecord.headers(); + assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java index 443d6b8367..a502eba63d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java @@ -37,6 +37,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; @@ -200,21 +201,25 @@ void shouldIncreaseAttemptsInNewHeader() { } @Test - void shouldAddOriginalTimestampHeader() { + void shouldAddOriginalTimestampHeaderAndCustom() { // setup RuntimeException e = new RuntimeException(); ConsumerRecord consumerRecord = new ConsumerRecord(testTopic, 0, 0, originalTimestamp, TimestampType.CREATE_TIME, -1, -1, key, value, new RecordHeaders(), Optional.empty()); - given(destinationTopicResolver.resolveDestinationTopic(testTopic, 1, e, originalTimestamp)).willReturn(destinationTopic); + given(destinationTopicResolver.resolveDestinationTopic(testTopic, 1, e, originalTimestamp)) + .willReturn(destinationTopic); given(destinationTopic.isNoOpsTopic()).willReturn(false); given(destinationTopic.getDestinationName()).willReturn(testRetryTopic); given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic); willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations(); given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture); - DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver); + DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory( + this.destinationTopicResolver); + factory.setHeadersFunction( + (rec, ex) -> new RecordHeaders(new RecordHeader[] { new RecordHeader("foo", "bar".getBytes()) })); // when DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = factory.create(); @@ -223,9 +228,11 @@ void shouldAddOriginalTimestampHeader() { // then then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture()); ProducerRecord producerRecord = producerRecordCaptor.getValue(); - Header originalTimestampHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP); + Header originalTimestampHeader = producerRecord.headers() + .lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP); assertThat(originalTimestampHeader).isNotNull(); assertThat(new BigInteger(originalTimestampHeader.value()).longValue()).isEqualTo(this.nowTimestamp); + assertThat(producerRecord.headers().lastHeader("foo")).extracting("value").isEqualTo("bar".getBytes()); } @Test