Skip to content

Add possibility to remove custom headers in method: addHeadersFunction(...) from DeadLetterPublishingRecoverer #2528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wojciechcwek opened this issue Jan 3, 2023 · 0 comments · Fixed by #2529

Comments

@wojciechcwek
Copy link

wojciechcwek commented Jan 3, 2023

Expected Behavior
I'd like to remove current headers in addHeadersFunction(BiFunction<ConsumerRecord, Exception, Headers> headersFunction).

In addHeadersFunction(...) we have consumer record. Removing the header from consumer record has no effect, because previous headers are kept in accept(...) method in headers variable.

Current Behavior

Currently it's not impossible to manipulate existing headers in addHeaderFunction(...)

Context

I want to do delete the current custom header, because headers are passed between all retry topics till dlt topic, which means that when I have 3 retry attempts, then in retry-0 I have one custom header, in retry-1 I have two same custom headers and finally in dlt I will end up with three same custom headers. When there would be an option to delete current headers in that function, then I could do a check if the custom header is already there and if it is, I could delete it before I add it with the new value.

This happens, because Kafka allows to hold duplicate Headers.

My example:

    @Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
    public DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver destinationTopicResolver) {
        DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(destinationTopicResolver);
        factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> dlpr.addHeadersFunction((consumerRecord, exception) -> {
            Throwable rootCause = ExceptionUtils.getRootCause(exception);

            return new RecordHeaders().add(EXTERNAL_SYSTEM_EXCEPTION_MESSAGE_HEADER, rootCause.getMessage().getBytes(StandardCharsets.UTF_8));
        ));
        return factory;
    }
@garyrussell garyrussell self-assigned this Jan 3, 2023
@garyrussell garyrussell added this to the 3.0.2 milestone Jan 3, 2023
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jan 3, 2023
Resolves spring-projects#2528

When adding headers in a DLPR headers function, header values accumulate
because Kafka headers support multiple values.

Provide a mechanism to allow adding a header to replace any existing header
with that name.

**cherry-pick to 2.9.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jan 3, 2023
Resolves spring-projects#2528

When adding headers in a DLPR headers function, header values accumulate
because Kafka headers support multiple values.

Provide a mechanism to allow adding a header to replace any existing header
with that name.

**cherry-pick to 2.9.x**
artembilan pushed a commit that referenced this issue Jan 3, 2023
Resolves #2528

When adding headers in a DLPR headers function, header values accumulate
because Kafka headers support multiple values.

Provide a mechanism to allow adding a header to replace any existing header
with that name.

**cherry-pick to 2.9.x**

* Fix javadoc.
artembilan pushed a commit that referenced this issue Jan 3, 2023
Resolves #2528

When adding headers in a DLPR headers function, header values accumulate
because Kafka headers support multiple values.

Provide a mechanism to allow adding a header to replace any existing header
with that name.

**cherry-pick to 2.9.x**

* Fix javadoc.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants