Skip to content

GH-2155: DeadLetterPublishingRecoverer Improvement #2161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 10, 2022

Conversation

garyrussell
Copy link
Contributor

@garyrussell garyrussell commented Mar 8, 2022

Resolves #2155

  • Add a BitSet property to suppress individual standard headers
  • Support multiple headersFunction
  • Allow complete customization of exception headers
  • Add setHeadersFunction to the DLPR factory for retryable topics

cherry-pick to 2.8.x

maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampTypeHeader,
record.timestampType().toString().getBytes(StandardCharsets.UTF_8));
if (ex instanceof ListenerExecutionFailedException) {
if (this.whichHeaders.get(HeaderNames.HeadersToAdd.TOPIC.getBit())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it maybe be clearer if this if statement was inside the maybeAddHeaders method, and we passed the enum as an argument?

@@ -137,6 +150,9 @@ public DeadLetterPublishingRecoverer create() {
};

recoverer.setHeadersFunction((consumerRecord, e) -> addHeaders(consumerRecord, e, getAttempts(consumerRecord)));
if (this.headersFunction != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, what I had in mind is maybe we could apply this headersFunction right here in the addHeaders method - this way we wouldn't need to make DLPR's header function composable and might make things simpler overall.

But also like this approach, which has the added benefit of warning the user if they set a headersFunction that would override RT's.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; I was going to do it here too, but I had already added to the code the DLPR and it would have been duplicated here - I also thought this could be useful in other scenarios, where a library has a "standard" corporate DLPR bean in a jar and a user wants to use that, plus add more headers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes total sense. Thanks.

/**
* The offset of the failed record.
*/
OFFSET(0),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like just an ordinal() is enough.
I mean all those explicit values for this enum is fully aligned with an ordinal() of the enum:


    /**
     * Returns the ordinal of this enumeration constant (its position
     * in its enum declaration, where the initial constant is assigned
     * an ordinal of zero).
     *
     * Most programmers will have no use for this method.  It is
     * designed for use by sophisticated enum-based data structures, such
     * as {@link java.util.EnumSet} and {@link java.util.EnumMap}.
     *
     * @return the ordinal of this enumeration constant
     */
    public final int ordinal() {

* Set a function that creates additional headers for the output record, in addition to the standard
* retry headers added by this factory.
* @param headersFunction the function.
* @since
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.8.4 ?

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with this.

@tomazfernandes , @v-chernyshev ?

@tomazfernandes
Copy link
Contributor

I'm OK with this.

@tomazfernandes , @v-chernyshev ?

Looks great to me. I didn’t know this BitSet class, works perfectly for this use case. Thanks.

@@ -83,9 +88,11 @@

private final Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver;

private final BitSet whichHeaders = new BitSet(10);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can HeaderNames.HeadersToAdd.values().length be used here instead?

Copy link

@v-chernyshev v-chernyshev Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. They say that a new array is allocated for every values() call, so I don't know whether it would make sense to also cache the length value...

One more question. What's the motivation for not using EnumSet<HeaderNames.HeadersToAdd> instead of BitSet? The documentation says that:

Enum sets are represented internally as bit vectors.

Then the whichHeaders set could be initialised with the help of allOf/noneOf static methods of EnumSet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading EnumSet JavaDocs, I concur that this might be a better way for our use-case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed; I wasn't aware of it.

@@ -83,9 +88,11 @@

private final Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver;

private final BitSet whichHeaders = new BitSet(10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading EnumSet JavaDocs, I concur that this might be a better way for our use-case.

@@ -45,6 +46,7 @@
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames.HeadersToAdd;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm. There is no policy to avoid imports from the class that is declared in the same file, is there? There are two places that use HeadersToAdd rather than HeaderNames.HeadersToAdd, which causes this import statement to appear.

garyrussell and others added 7 commits March 10, 2022 08:49
Resolves spring-projects#2155

- Add a `BitSet` property to suppress individual standard headers
- Support multiple `headersFunction`
- Allow complete customization of exception headers
- Add `setHeadersFunction` to the DLPR factory for retryable topics
Co-authored-by: Viacheslav Chernyshev <[email protected]>
Comment on lines 401 to 403
if (headers2 != null) {
headers2.forEach(headers1::add);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one minor point of concern with this implementation. Imagine that the original toCompose function does something similar to this:

Headers addHeaders(ConsumerRecord<...> record, Exception exception) {
    RecordHeaders extraHeaders = new RecordHeaders();
    extraHeaders.add("key", ...);
    extraHeaders.setReadOnly(); // This is the important line.

    return extraHeaders;
}

The returned value would be stored in headers1. headers2.forEach(headers1::add); would then fail with an IllegalStateException because headers1 can no longer be modified. A safer, albeit slightly more expensive, alternative could be:

// I believe the constructor is null-safe, but please double-check.
RecordHeaders headers1 = new RecordHeaders(toCompose.apply(rec, ex));
Headers headers2 = headersFunction.apply(rec, ex);
if (headers2 != null) {
    headers2.forEach(headers1::add);
}
return headers1;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unusual; but thanks.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I highly doubt anyone would actually do it, so I'm happy with the current implementation too.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with this.
Any other concerns before merging?

@tomazfernandes
Copy link
Contributor

I'm OK with this.
Any other concerns before merging?

Not from me, thanks. And thanks @v-chernyshev for the thorough review and bringing in the EnumSet.

@v-chernyshev
Copy link

v-chernyshev commented Mar 10, 2022

Not from me, thanks. And thanks @v-chernyshev for the thorough review and bringing in the EnumSet.

You are welcome, thanks for implementing this feature suggestion so quickly :) I do not have any further comments for the PR.

@artembilan artembilan merged commit f83fc4b into spring-projects:main Mar 10, 2022
@artembilan
Copy link
Member

... and cherry-picked to 2.8.x as f3f3eac after fixing conflicts.

@garyrussell garyrussell deleted the GH-2155 branch March 11, 2022 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

RetryableTopic - provide a mechanism to suppress non-essential headers
4 participants