|
43 | 43 | import java.util.concurrent.TimeUnit;
|
44 | 44 | import java.util.concurrent.atomic.AtomicBoolean;
|
45 | 45 | import java.util.function.BiConsumer;
|
| 46 | +import java.util.function.Supplier; |
46 | 47 | import java.util.regex.Pattern;
|
47 | 48 | import java.util.stream.Collectors;
|
48 | 49 |
|
|
107 | 108 | import org.springframework.kafka.support.LogIfLevelEnabled;
|
108 | 109 | import org.springframework.kafka.support.TopicPartitionOffset;
|
109 | 110 | import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
|
110 |
| -import org.springframework.kafka.support.micrometer.DefaultKafkaListenerObservationConvention; |
111 | 111 | import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
|
| 112 | +import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; |
112 | 113 | import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
|
113 | 114 | import org.springframework.kafka.support.micrometer.MicrometerHolder;
|
114 | 115 | import org.springframework.kafka.support.serializer.DeserializationException;
|
@@ -365,9 +366,9 @@ protected void doStart() {
|
365 | 366 | }
|
366 | 367 | GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
|
367 | 368 | ListenerType listenerType = determineListenerType(listener);
|
368 |
| - ObservationRegistry observationRegistry = null; |
| 369 | + ObservationRegistry observationRegistry = ObservationRegistry.NOOP; |
369 | 370 | ApplicationContext applicationContext = getApplicationContext();
|
370 |
| - if (applicationContext != null) { |
| 371 | + if (applicationContext != null && containerProperties.isObservationEnabled()) { |
371 | 372 | ObjectProvider<ObservationRegistry> registry =
|
372 | 373 | applicationContext.getBeanProvider(ObservationRegistry.class);
|
373 | 374 | observationRegistry = registry.getIfUnique();
|
@@ -827,7 +828,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
|
827 | 828 |
|
828 | 829 | @SuppressWarnings(UNCHECKED)
|
829 | 830 | ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
|
830 |
| - @Nullable ObservationRegistry observationRegistry) { |
| 831 | + ObservationRegistry observationRegistry) { |
831 | 832 |
|
832 | 833 | this.observationRegistry = observationRegistry;
|
833 | 834 | Properties consumerProperties = propertiesFromProperties();
|
@@ -2706,16 +2707,12 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
|
2706 | 2707 | Iterator<ConsumerRecord<K, V>> iterator) {
|
2707 | 2708 |
|
2708 | 2709 | Object sample = startMicrometerSample();
|
2709 |
| - Observation observation; |
2710 |
| - if (!this.containerProperties.isObservationEnabled() || this.observationRegistry == null) { |
2711 |
| - observation = Observation.NOOP; |
2712 |
| - } |
2713 |
| - else { |
2714 |
| - observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation( |
2715 |
| - this.containerProperties.getObservationConvention(), |
2716 |
| - DefaultKafkaListenerObservationConvention.INSTANCE, |
2717 |
| - new KafkaRecordReceiverContext(record, getListenerId()), this.observationRegistry); |
2718 |
| - } |
| 2710 | + Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation( |
| 2711 | + this.containerProperties.getObservationConvention(), |
| 2712 | + DefaultKafkaListenerObservationConvention.INSTANCE, |
| 2713 | + (Supplier<KafkaRecordReceiverContext>) () -> new KafkaRecordReceiverContext(record, |
| 2714 | + getListenerId()), |
| 2715 | + this.observationRegistry); |
2719 | 2716 | return observation.observe(() -> {
|
2720 | 2717 | try {
|
2721 | 2718 | invokeOnMessage(record);
|
|
0 commit comments