Skip to content

Trace information is missing when Exception is thrown from KafkaListener methods #1704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jonatan-ivanov opened this issue Feb 9, 2021 · 13 comments · Fixed by #1946
Closed

Comments

@jonatan-ivanov
Copy link
Member

jonatan-ivanov commented Feb 9, 2021

Affects Version(s): All currently supported versions (including 2.6.5)

🐞 Bug report

The issue was originally reported for Spring Cloud Sleuth but it seems it is not a Sleuth issue (and I can't transfer issues across orgs) so I'm opening this one to track it at the right place.

The original issue is this: spring-cloud/spring-cloud-sleuth#1659 opened by @m-grzesiak
It contains the details to understand what is going on, a sample project that reproduces the issue and some investigation details. The issue is very similar to spring-cloud/spring-cloud-sleuth#1660

Description
When an exception is thrown from a method annotated with @KafkaListener all tracing information is lost when it reaches the error handler and error logs do not contain tracing information.

Sample: https://github.com/jonatan-ivanov/sleuth-gh-1659
The sample project uses spring-kafka 2.6.4 but the issue must be present in the latest spring-kafka too (2.6.5).

Investigation details: spring-cloud/spring-cloud-sleuth#1659 (also see spring-cloud/spring-cloud-sleuth#1660)
Possible fix (breaking change): should be similar to spring-projects/spring-amqp#1287

@MaksimMyshkin
Copy link

Also waiting for fix. It was unexpected that ErrorHandler is not getting original traceId context from Sleuth.

@kdowbecki
Copy link
Contributor

kdowbecki commented Aug 11, 2021

My requirement is: all logs related to particular batch of processed Kafka records should share the trace context information.

For @KafkaListener in batch mode Sleuth by default doesn't take the trace context from b3 header of individual records and instead generates a single trace that represents an entire batch. I like this.

However I'd like to start the trace earlier: just before the Kafka Consumer poll() is called. I'd also like to end the trace later, after BatchErrorHandler finishes handling the exception. That way I can have a single trace context spanning all operations related to a particular batch of records.

Right now there is no mechanism to intercept KafkaMessageListenerContainer.ListenerConsumer.run() method. If there was an interceptor that allows to do something around pollAndInvoke() method I could add my own trace context code. Since the listener is invoked in the same thread this should be easy, I don't have to consider any reactive pattern for my solution.

Currently there are few scenarios that loose the trace context:

  1. Exceptions throw by @KafkaListener which is annotated with @NewSpan.
  2. Exceptions throw by Key/Value Deserializer class if we specify spring.deserializer.value.function that re-throws the exception.
  3. Logs directly from by KafkaMessageListenerContainer.ListenerConsumer e.g. "Error handler threw an exception".

It would be great if somehow all of these scenarios were smart enough to understand that pollAndInvoke() iteration should have the same trace context. Does this make sense? I can try to raise a PR to achieve it but I wonder if this is the right direction for Spring Kafka.

@garyrussell
Copy link
Contributor

@kdowbecki I wonder if the BatchInterceptor with additional methods beforePoll() and afterProcessing() would satisfy your needs?

I don't see an easy way to handle the @NewSpan case, though.

Similarly for RecordInterceptor for record listeners (although the beforePoll() might not be so useful there).

@kdowbecki
Copy link
Contributor

@garyrussell Thanks for suggesting BatchInterceptor but unfortunately it doesn't fully cover the batch processing logic. BatchErrorHandler is invoked after BatchInterceptor.failure() method runs. Since BatchErrorHandler throws an exception to signal errors it results in ListenerConsumer logging "Error handler threw an exception" that don't have any trace information assigned. That happens with BatchInterceptor because I have to cleanup trace context in failure() method.

BatchInterceptor would work if failure() method was called after BatchErrorHandler executes and ListenerConsumer was not logging any further messages related to particular batch. My goal is to have a single trace context for the batch processing including error handling e.g. I don't care about ListenerConsumer consumer start/stop/fenced events.

@garyrussell
Copy link
Contributor

That is exactly why I suggested adding new methods to the interceptor. One called before the poll and the other after all processing is complete (including error handlers).

@kdowbecki
Copy link
Contributor

kdowbecki commented Aug 16, 2021

@garyrussell That would work for me, are the new methods planned for upcoming release (e.g. in some other open issue)? If not, is it fine if I raise a PR?

@garyrussell
Copy link
Contributor

garyrussell commented Aug 16, 2021

No, there is no other issue. The next 2.8 milestone is due Sept 20. Since this would not be a breaking change (assuming the new methods are default) we can back port it to 2.7.x, the next of which is due the same day.

Thanks for the offer of a PR.

we need to enhance RecordInterceptor too.

@garyrussell garyrussell modified the milestones: Backlog, 2.8.0-M3 Aug 16, 2021
kdowbecki pushed a commit to kdowbecki/spring-kafka that referenced this issue Aug 17, 2021
@kdowbecki
Copy link
Contributor

kdowbecki commented Aug 17, 2021

@garyrussell Does kdowbecki@90c9138 look like what you were thinking about? I'd appreciate early feedback before I dive deeper into it, this is my first time contributing to Spring Kafka.

I'm not fully getting the concept of sub batches in ListenerConsumer.doPoll(). It looks like we are iterating with batchIterator over Kafka partitions so I don't think we should be calling the new BatchIterator.beforePoll() method there. This would be similar with current BatchIterator.intercept() method, which we already have. Unless I'm wrong and we should beforePoll() before each sub batch?

kdowbecki pushed a commit to kdowbecki/spring-kafka that referenced this issue Aug 17, 2021
@garyrussell
Copy link
Contributor

Looks good to me, with a few suggestions.

  1. In the container, you have calls to batchInterceptBeforePoll() and beforePoll() for records. I think this can be a single call to beforePoll() before we check the listener type.
  2. It would be more aesthetically pleasing if beforePoll() is first in the interceptors.
  3. We need to handle the poll() throwing an exception, with a conditional call to finishInvoke() in a finally block in the run() method. e.g. a boolean success set to false just after the while() and true after the pollAndInvoke() and use it in the finally block to call the clean up if an exception occurred.

I also think we should beef up the javadocs, e.g. for beforePoll() "You can use this method to set up thread bound resources, such as logging MDC. This allows components used in the poll, e.g. Deserializers, ConsumerInterceptors, to access these resources.

And for the after... "Use this method to clean up any thread-bound resources set up by the interceptor".

I think it would be best to proceed to a PR so we can comment directly there (it's much easier for us that way).

Thanks

kdowbecki pushed a commit to kdowbecki/spring-kafka that referenced this issue Aug 18, 2021
@kdowbecki
Copy link
Contributor

Thanks for suggestions, I applied them and opened a PR: #1912. Let's move the discussion there.

kdowbecki pushed a commit to kdowbecki/spring-kafka that referenced this issue Sep 13, 2021
kdowbecki pushed a commit to kdowbecki/spring-kafka that referenced this issue Sep 13, 2021
garyrussell pushed a commit that referenced this issue Sep 17, 2021
* GH-1704: Broader Batch/RecordInterceptor

* GH-1704: More pessimistic finally

* Precise logger message

* Renaming methods and improving Javadoc.

Still need to work on race condition in the tests because beforePoll() and afterRecordsProcessed() is called while the test is setting up data.

* Fixing tests

* Fixing tests

* Moving beforePoll() and adding InOrder tests

* Reverting integration tests

* Extracting BeforeAfterPollProcessor

* Renaming afterPoll to clearThreadState

* Fixing compiler warnings in tests
@kdowbecki
Copy link
Contributor

kdowbecki commented Sep 17, 2021

@garyrussell One interesting aspect of this issue is RetryingBatchErrorHandler which will poll the consumer during retries. I'm not 100% sure if the BeforeAfterPollProcessor should be called there since this is a fake poll to keep the broker happy. The RetryingBatchErrorHandler retry happens within the same thread as the original record processing so from the tracing perspective it's the same trace.

@garyrussell
Copy link
Contributor

Right; the poll() there is simply to keep the consumer alive and prevent a rebalance. No new records will ever be returned by that poll because the consumer is paused.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Sep 17, 2021
Resolves spring-projects#1704

- Extract `ThreadStateProcessor` and `ConsumerAwareThreadStateProcessor`
- Avoid all the if/else tests around calling the interceptor common methods
- Add `afterRecord` to the record interceptor so sleuth can defer cleaning up until after
  the error handler.
@garyrussell
Copy link
Contributor

Polishing PR: #1946

artembilan pushed a commit that referenced this issue Sep 20, 2021
Resolves #1704

- Extract `ThreadStateProcessor`
- Avoid all the if/else tests around calling the interceptor common methods
- Add `afterRecord` to the record interceptor so sleuth can defer cleaning up until after
  the error handler.

* Remove ThreadStateProcessor (TSP), rename CATSP to TSP.

- confusing for interceptor implementors
- no components ever call `setupThreadState()`; ARP and CEH build state during normal processing.
kdowbecki pushed a commit to kdowbecki/spring-kafka that referenced this issue Sep 22, 2021
garyrussell pushed a commit that referenced this issue Sep 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants