Skip to content

KafkaMessageListenerContainer Observation Scope seems to be setup incorrectly #3686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
leaf-upper opened this issue Dec 16, 2024 · 10 comments · Fixed by #3689
Closed

KafkaMessageListenerContainer Observation Scope seems to be setup incorrectly #3686

leaf-upper opened this issue Dec 16, 2024 · 10 comments · Fixed by #3689
Assignees
Milestone

Comments

@leaf-upper
Copy link

leaf-upper commented Dec 16, 2024

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

3.3.0

Describe the bug

Recently In applicaiton, Spring Kafka Version upgraded to spring-kafka 3.3.0 from spring-kafka 3.2.4
In doInvokeRecordListener method, when message failed and scope is closed, logging data (For example, MDC) is disappered.

//spring-kafka 3.3.0
observation.start();
try (Observation.Scope ignored = observation.openScope()) {
  invokeOnMessage(cRecord);
  successTimer(sample, cRecord);
  recordInterceptAfter(cRecord, null);
}
catch (RuntimeException e) {
  failureTimer(sample, cRecord); // Observation Scope is closed and, (Logging Data) MDC is initialized. 
  recordInterceptAfter(cRecord, e); 
  ...

but In spring-kafka 3.2.4
Observaiton scope is not closed when message failed, and Logging Information is maintained

//spring-kafka 3.2.4
return observation.observe(() -> {
try {
  invokeOnMessage(cRecord);
  successTimer(sample, cRecord);
  recordInterceptAfter(cRecord, null);
}
catch (RuntimeException e) {
  failureTimer(sample, cRecord);
  recordInterceptAfter(cRecord, e);
  ...

In conclusion, is it possible to roll back to the original code?

Expected behavior

i expect that logging data is maintained when message failed for logging in RecordInterceptor

@artembilan
Copy link
Member

Hold on.
I don't understand why scope is closed.
According to the try-with-resource specification, it is closed from the finally block: https://docs.oracle.com/javase/specs/jls/se8/html/jls-14.html#jls-14.20.3
So, how that turned out to be closed for that failureTimer()?
What do I miss?
Plus you talk about a RecordInterceptor. How is that involved here for this observation logic?
May you can share with us some simple project where we can reproduce?

Thanks

@cyberflohr
Copy link

We see exact the same issue in our project, after updating to SpringBoot 3.4.0 (which uses Spring Kafka 3.3.0).
If an exception occurs during message processing, the error handlers are executed without any MDC trace info .

According to the try-with-resource spec, the doInvokeRecordListener is using an "Extended try-with-resources" https://docs.oracle.com/javase/specs/jls/se8/html/jls-14.html#jls-14.20.3.2 which states the following:

Furthermore, all resources will have been closed (or attempted to be closed) by the time the finally block is executed
IMO the scope is already closed when the finally blocked gets executed.

image

@artembilan
Copy link
Member

The previous logic was:

return observation.observe(() -> {
				try {
					invokeOnMessage(cRecord);
					successTimer(sample, cRecord);
					recordInterceptAfter(cRecord, null);
				}
				catch (RuntimeException e) {
					failureTimer(sample, cRecord);
					recordInterceptAfter(cRecord, e);
					if (this.commonErrorHandler == null) {
						throw e;
					}
					observation.error(e);
					try {
						invokeErrorHandler(cRecord, iterator, e);
						commitOffsetsIfNeededAfterHandlingError(cRecord);
					}
					catch (KafkaException ke) {
						ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
						return ke;
					}
					catch (RuntimeException ee) {
						this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
						return ee;
					}
					catch (Error er) { // NOSONAR
						this.logger.error(er, "Error handler threw an error");
						throw er;
					}
				}
				return null;
			});

Current one is:

			observation.start();
			try (Observation.Scope ignored = observation.openScope()) {
				invokeOnMessage(cRecord);
				successTimer(sample, cRecord);
				recordInterceptAfter(cRecord, null);
			}
			catch (RuntimeException e) {
				failureTimer(sample, cRecord);
				recordInterceptAfter(cRecord, e);
				if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
					observation.error(e);
				}
				if (this.commonErrorHandler == null) {
					throw e;
				}
				try {
					invokeErrorHandler(cRecord, iterator, e);
					commitOffsetsIfNeededAfterHandlingError(cRecord);
				}
				catch (RecordInRetryException rire) {
					this.logger.info("Record in retry and not yet recovered");
					return rire;
				}
				catch (KafkaException ke) {
					ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
					return ke;
				}
				catch (RuntimeException ee) {
					this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
					return ee;
				}
				catch (Error er) { // NOSONAR
					this.logger.error(er, "Error handler threw an error");
					throw er;
				}
			}
			finally {
				if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
					observation.stop();
				}
			}
			return null;
		}

Which is semantically the same.

What part of our new logic is losing the scope?
That failureTimer(sample, cRecord); is within the scope.
The invokeErrorHandler(cRecord, iterator, e); is still there anyway.
The observation.observe() implementation is like this:

    default <T> T observe(Supplier<T> supplier) {
        start();
        try (Scope scope = openScope()) {
            return supplier.get();
        }
        catch (Throwable error) {
            error(error);
            throw error;
        }
        finally {
            stop();
        }
    }

What do I miss, please?

@sobychacko
Copy link
Contributor

@cyberflohr The following means that when the catch block is executed, the scope is still open. It is ony by the time, the finally is executed, the resources are cleaned up - in this case, the scope is closed.

Furthermore, all resources will have been closed (or attempted to be closed) by the time the finally block is executed
IMO the scope is already closed when the finally blocked gets executed.

So, we are not sure what the issue is. It would be great if we could reproduce the issue via a simple application.

@leaf-upper
Copy link
Author

leaf-upper commented Dec 17, 2024

@sobychacko @artembilan
https://github.com/leaf-upper/spring-kafka-observation-test
the application for testing is here
please put your kafka brokers servers property in application.yaml and publish message within dev-test topic.
there are two topics dev-test and dev-test-retry


To summarize my case,
In previous, spring-kafka (3.2.4) LoggingInterceptor.failure() method print traceId normally, exists in MDC Context and traceid is propagated in non-blocking retry context
스크린샷 2024-12-17 오전 11 00 00

but now, spring-kafka (3.3.0) LoggingInterceptor.failure() method don't have traceId in MDC Context and traceId is not propagated in non-blocking retry context
스크린샷 2024-12-17 오전 11 01 23

I think that difference thing between before and after came from changing observation scope code.
i guess that scope is closed when try statements ended, and then MDC is also initialized.

@cyberflohr
Copy link

cyberflohr commented Dec 17, 2024

I debugged the KafkaMessageListenerContainer code and marked the differences in the code (see image below)
IMO its pretty normal that the scope is closed after the try block is executed, its the standard scoping behaviour for variables which also applies e.g. for if/else/for/while.

FYI @artembilan @sobychacko

image

@cyberflohr
Copy link

If you need a simple application to reproduce the problem, just let me know :-)

@lupor
Copy link

lupor commented Dec 17, 2024

Hold on. I don't understand why scope is closed. According to the try-with-resource specification, it is closed from the finally block: https://docs.oracle.com/javase/specs/jls/se8/html/jls-14.html#jls-14.20.3 So, how that turned out to be closed for that failureTimer()? What do I miss? Plus you talk about a RecordInterceptor. How is that involved here for this observation logic? May you can share with us some simple project where we can reproduce?

Thanks

I don't think that this is an accurate interpretation of the spec. The resource is closed after the try block, which means that it is already closed by the time the finally block is executed or the catch block for that matter. At least that is how I read it.

I had a superficial look at the micrometer codebase and saw this comment on the Scope interface here:

Scope represent an action within which certain resources (e.g. tracing context) are
put in scope (e.g. in a ThreadLocal). When the scope is closed the resources will
be removed from the scope.

@artembilan
Copy link
Member

Thank you for the sample!
I indeed was able to reproduce it.
So, looks like we have treated try-with-resource wrong way.
I see that an observation scope is closed by the catch block in reached 🤷 .

Apparently we have to manage close() manually as well.
No try-with-resource, but regular try..catch..finally.

Will fix it shortly and test with your sample again.

@artembilan
Copy link
Member

So, now it is better:

2024-12-17T11:09:28.046-05:00  INFO 38000 --- [tion-test-0-C-1] [6761a2388490ae903169554456022367-3169554456022367] c.u.test.Application$LoggingInterceptor  : intercept traceId=6761a2388490ae903169554456022367
2024-12-17T11:09:28.050-05:00  INFO 38000 --- [tion-test-0-C-1] [6761a2388490ae903169554456022367-3169554456022367] c.u.test.Application$LoggingInterceptor  : failure traceId=6761a2388490ae903169554456022367
2024-12-17T11:09:28.064-05:00  INFO 38000 --- [tion-test-0-C-1] [6761a2388490ae903169554456022367-3169554456022367] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-kafka-observation-test-1, groupId=kafka-observation-test] Seeking to offset 0 for partition dev-test-1
2024-12-17T11:09:28.065-05:00  INFO 38000 --- [tion-test-0-C-1] [6761a2388490ae903169554456022367-3169554456022367] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2024-12-17T11:09:28.065-05:00  INFO 38000 --- [tion-test-0-C-1] [                                -                ] c.u.test.Application$LoggingInterceptor  : after traceId=null

The afterRecord() is not a part of doInvokeRecordListener(), so that is expected to not have a scope over there.

@artembilan artembilan added this to the 3.3.2 milestone Dec 17, 2024
@artembilan artembilan self-assigned this Dec 17, 2024
artembilan added a commit to artembilan/spring-kafka that referenced this issue Dec 17, 2024
…ssageListenerContainer`

Fixes: spring-projects#3686

According to our investigation around the `try-with-resource`,
it looks like the resource is already closed when we reach the `catch` block.

* Rework  `KafkaMessageListenerContainer.ListenerConsumer.doInvokeRecordListener()`
to `observation.openScope()` before the `try` and close it manually in the `finally` block
artembilan added a commit to artembilan/spring-kafka that referenced this issue Dec 17, 2024
…Container`

Fixes: spring-projects#3686

According to our investigation around the `try-with-resource`,
it looks like the resource is already closed when we reach the `catch` block.

* Rework  `KafkaMessageListenerContainer.ListenerConsumer.doInvokeRecordListener()`
to `observation.openScope()` before the `try` and close it manually in the `finally` block
* Verify `RecordInterceptor.failure()` has a scope in the `ObservationTests`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants