Skip to content

Commit eb7fa48

Browse files
committed
spring-projectsGH-3686: Fix observation scope closure in the KafkaMessageListenerContainer
Fixes: spring-projects#3686 According to our investigation around the `try-with-resource`, it looks like the resource is already closed when we reach the `catch` block. * Rework `KafkaMessageListenerContainer.ListenerConsumer.doInvokeRecordListener()` to `observation.openScope()` before the `try` and close it manually in the `finally` block
1 parent 4716ce1 commit eb7fa48

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -2750,7 +2750,6 @@ private void pauseForNackSleep() {
27502750
* @throws Error an error.
27512751
*/
27522752
@Nullable
2753-
@SuppressWarnings("try")
27542753
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR
27552754
Iterator<ConsumerRecord<K, V>> iterator) {
27562755

@@ -2763,7 +2762,9 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27632762
this.observationRegistry);
27642763

27652764
observation.start();
2766-
try (Observation.Scope ignored = observation.openScope()) {
2765+
Observation.Scope observationScope = observation.openScope();
2766+
// We cannot use 'try-with-resource' because the resource is closed just before catch block
2767+
try {
27672768
invokeOnMessage(cRecord);
27682769
successTimer(sample, cRecord);
27692770
recordInterceptAfter(cRecord, null);
@@ -2802,6 +2803,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
28022803
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
28032804
observation.stop();
28042805
}
2806+
observationScope.close();
28052807
}
28062808
return null;
28072809
}
@@ -4020,6 +4022,6 @@ private static class StopAfterFenceException extends KafkaException {
40204022

40214023
}
40224024

4023-
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { };
4025+
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { }
40244026

40254027
}

0 commit comments

Comments
 (0)