Skip to content

Commit 66e5c11

Browse files
GH-1444: Listener Observability Initial Commit (#1500)
* GH-1444: Listener Observability Initial Commit - expand scope to include error handler: #1287 - tracing can't be used with a batch listener (multiple messages in listener call) * Rename Sender/Receiver contexts and other PR review comments. * Rename contexts to Rabbit...; supply default KeyValues via the conventions. * Javadoc polishing. * Don't add default KV to high-card KVs. * Fix previous commit. * Fix contextual name (receiver side). * Fix checkstyle. * Polish previous commit. * Fix contextual name (sender side) * Remove contextual names from observations. * Fix checkstyle. * Remove customization of KeyValues from conventions. * Add `getDefaultConvention()` to observations. * Fix since 3.0. * Support wider convention customization. * Convention type safety. * Fix Test - not sure why PR build succeeded. * Add Meters to ObservationTests. * Fix checkstyle. * Make INSTANCE final. * Add integration test. * Test all available integrations. * Remove redundant test code. * Move getContextualName to conventions. * Add docs. * Fix doc link. Co-authored-by: Artem Bilan <[email protected]> * Remove unnecessary method overrides; make tag names more meaningful. * Move getName() from contexts to conventions. * Fix Race in Test * Fix Race in Test. Co-authored-by: Artem Bilan <[email protected]>
1 parent 80f8c9c commit 66e5c11

17 files changed

+1043
-10
lines changed

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ project('spring-rabbit') {
387387
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
388388
optionalApi 'org.apache.logging.log4j:log4j-core'
389389
optionalApi 'io.micrometer:micrometer-core'
390+
api 'io.micrometer:micrometer-observation'
390391
optionalApi 'io.micrometer:micrometer-tracing'
391392
// Spring Data projection message binding support
392393
optionalApi ("org.springframework.data:spring-data-commons") {
@@ -398,6 +399,7 @@ project('spring-rabbit') {
398399
testApi project(':spring-rabbit-junit')
399400
testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")
400401
testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
402+
testImplementation 'io.micrometer:micrometer-observation-test'
401403
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
402404
testImplementation 'io.micrometer:micrometer-tracing-test'
403405
testImplementation 'io.micrometer:micrometer-tracing-integration-test'

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitAccessor.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,10 +21,13 @@
2121

2222
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
2323
import org.springframework.beans.factory.InitializingBean;
24+
import org.springframework.beans.factory.ObjectProvider;
25+
import org.springframework.context.ApplicationContext;
2426
import org.springframework.lang.Nullable;
2527
import org.springframework.util.Assert;
2628

2729
import com.rabbitmq.client.Channel;
30+
import io.micrometer.observation.ObservationRegistry;
2831

2932
/**
3033
* @author Mark Fisher
@@ -40,6 +43,8 @@ public abstract class RabbitAccessor implements InitializingBean {
4043

4144
private volatile boolean transactional;
4245

46+
private ObservationRegistry observationRegistry;
47+
4348
public boolean isChannelTransacted() {
4449
return this.transactional;
4550
}
@@ -113,4 +118,17 @@ protected RuntimeException convertRabbitAccessException(Exception ex) {
113118
return RabbitExceptionTranslator.convertRabbitAccessException(ex);
114119
}
115120

121+
protected void obtainObservationRegistry(@Nullable ApplicationContext appContext) {
122+
if (this.observationRegistry == null && appContext != null) {
123+
ObjectProvider<ObservationRegistry> registry =
124+
appContext.getBeanProvider(ObservationRegistry.class);
125+
this.observationRegistry = registry.getIfUnique();
126+
}
127+
}
128+
129+
@Nullable
130+
protected ObservationRegistry getObservationRegistry() {
131+
return this.observationRegistry;
132+
}
133+
116134
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

+72-6
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@
7474
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
7575
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
7676
import org.springframework.amqp.rabbit.support.ValueExpression;
77+
import org.springframework.amqp.rabbit.support.micrometer.DefaultRabbitTemplateObservationConvention;
78+
import org.springframework.amqp.rabbit.support.micrometer.RabbitMessageSenderContext;
79+
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservation;
80+
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservationConvention;
7781
import org.springframework.amqp.support.converter.MessageConverter;
7882
import org.springframework.amqp.support.converter.SimpleMessageConverter;
7983
import org.springframework.amqp.support.converter.SmartMessageConverter;
@@ -83,6 +87,8 @@
8387
import org.springframework.beans.factory.BeanFactoryAware;
8488
import org.springframework.beans.factory.BeanNameAware;
8589
import org.springframework.beans.factory.DisposableBean;
90+
import org.springframework.context.ApplicationContext;
91+
import org.springframework.context.ApplicationContextAware;
8692
import org.springframework.context.expression.BeanFactoryResolver;
8793
import org.springframework.context.expression.MapAccessor;
8894
import org.springframework.core.ParameterizedTypeReference;
@@ -108,6 +114,8 @@
108114
import com.rabbitmq.client.Return;
109115
import com.rabbitmq.client.ShutdownListener;
110116
import com.rabbitmq.client.ShutdownSignalException;
117+
import io.micrometer.observation.Observation;
118+
import io.micrometer.observation.ObservationRegistry;
111119

112120
/**
113121
* <p>
@@ -152,7 +160,7 @@
152160
* @since 1.0
153161
*/
154162
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
155-
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
163+
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener, ApplicationContextAware,
156164
ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
157165

158166
private static final String UNCHECKED = "unchecked";
@@ -198,6 +206,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
198206

199207
private final AtomicInteger containerInstance = new AtomicInteger();
200208

209+
private ApplicationContext applicationContext;
210+
201211
private String exchange = DEFAULT_EXCHANGE;
202212

203213
private String routingKey = DEFAULT_ROUTING_KEY;
@@ -258,13 +268,20 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
258268

259269
private ErrorHandler replyErrorHandler;
260270

271+
private boolean useChannelForCorrelation;
272+
273+
private boolean observationEnabled;
274+
275+
@Nullable
276+
private RabbitTemplateObservationConvention observationConvention;
277+
261278
private volatile boolean usingFastReplyTo;
262279

263280
private volatile boolean evaluatedFastReplyTo;
264281

265282
private volatile boolean isListener;
266283

267-
private boolean useChannelForCorrelation;
284+
private volatile boolean observationRegistryObtained;
268285

269286
/**
270287
* Convenient constructor for use with setter injection. Don't forget to set the connection factory.
@@ -297,6 +314,29 @@ public final void setConnectionFactory(ConnectionFactory connectionFactory) {
297314
}
298315
}
299316

317+
@Override
318+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
319+
this.applicationContext = applicationContext;
320+
}
321+
322+
/**
323+
* Enable observation via micrometer.
324+
* @param observationEnabled true to enable.
325+
* @since 3.0
326+
*/
327+
public void setObservationEnabled(boolean observationEnabled) {
328+
this.observationEnabled = observationEnabled;
329+
}
330+
331+
/**
332+
* Set an observation convention; used to add additional key/values to observations.
333+
* @param observationConvention the convention.
334+
* @since 3.0
335+
*/
336+
public void setObservationConvention(RabbitTemplateObservationConvention observationConvention) {
337+
this.observationConvention = observationConvention;
338+
}
339+
300340
/**
301341
* The name of the default exchange to use for send operations when none is specified. Defaults to <code>""</code>
302342
* which is the default exchange in the broker (per the AMQP specification).
@@ -2348,7 +2388,7 @@ private boolean isPublisherConfirmsOrReturns(ConnectionFactory connectionFactory
23482388
* @throws IOException If thrown by RabbitMQ API methods.
23492389
*/
23502390
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
2351-
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
2391+
boolean mandatory, @Nullable CorrelationData correlationData) {
23522392

23532393
String exch = nullSafeExchange(exchangeArg);
23542394
String rKey = nullSafeRoutingKey(routingKeyArg);
@@ -2378,14 +2418,34 @@ public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Me
23782418
logger.debug("Publishing message [" + messageToUse
23792419
+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
23802420
}
2381-
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
2421+
observeTheSend(channel, messageToUse, mandatory, exch, rKey);
23822422
// Check if commit needed
23832423
if (isChannelLocallyTransacted(channel)) {
23842424
// Transacted channel created by this template -> commit.
23852425
RabbitUtils.commitIfNecessary(channel);
23862426
}
23872427
}
23882428

2429+
protected void observeTheSend(Channel channel, Message message, boolean mandatory, String exch, String rKey) {
2430+
2431+
if (!this.observationRegistryObtained) {
2432+
obtainObservationRegistry(this.applicationContext);
2433+
this.observationRegistryObtained = true;
2434+
}
2435+
Observation observation;
2436+
ObservationRegistry registry = getObservationRegistry();
2437+
if (!this.observationEnabled || registry == null) {
2438+
observation = Observation.NOOP;
2439+
}
2440+
else {
2441+
observation = RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention,
2442+
DefaultRabbitTemplateObservationConvention.INSTANCE,
2443+
new RabbitMessageSenderContext(message, this.beanName, exch + "/" + rKey), registry);
2444+
2445+
}
2446+
observation.observe(() -> sendToRabbit(channel, exch, rKey, mandatory, message));
2447+
}
2448+
23892449
/**
23902450
* Return the exchange or the default exchange if null.
23912451
* @param exchange the exchange.
@@ -2407,10 +2467,16 @@ public String nullSafeRoutingKey(String rk) {
24072467
}
24082468

24092469
protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
2410-
Message message) throws IOException {
2470+
Message message) {
2471+
24112472
BasicProperties convertedMessageProperties = this.messagePropertiesConverter
24122473
.fromMessageProperties(message.getMessageProperties(), this.encoding);
2413-
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
2474+
try {
2475+
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
2476+
}
2477+
catch (IOException ex) {
2478+
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
2479+
}
24142480
}
24152481

24162482
private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

+52-3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@
6363
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
6464
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
6565
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
66+
import org.springframework.amqp.rabbit.support.micrometer.DefaultRabbitListenerObservationConvention;
67+
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservation;
68+
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservationConvention;
69+
import org.springframework.amqp.rabbit.support.micrometer.RabbitMessageReceiverContext;
6670
import org.springframework.amqp.support.ConditionalExceptionLogger;
6771
import org.springframework.amqp.support.ConsumerTagStrategy;
6872
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
@@ -91,6 +95,8 @@
9195

9296
import com.rabbitmq.client.Channel;
9397
import com.rabbitmq.client.ShutdownSignalException;
98+
import io.micrometer.observation.Observation;
99+
import io.micrometer.observation.ObservationRegistry;
94100

95101
/**
96102
* @author Mark Pollack
@@ -240,6 +246,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
240246

241247
private boolean micrometerEnabled = true;
242248

249+
private boolean observationEnabled = false;
250+
243251
private boolean isBatchListener;
244252

245253
private long consumeDelay;
@@ -254,6 +262,9 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
254262

255263
private MessageAckListener messageAckListener = (success, deliveryTag, cause) -> { };
256264

265+
@Nullable
266+
private RabbitListenerObservationConvention observationConvention;
267+
257268
@Override
258269
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
259270
this.applicationEventPublisher = applicationEventPublisher;
@@ -1151,14 +1162,36 @@ public void setMicrometerTags(Map<String, String> tags) {
11511162
}
11521163

11531164
/**
1154-
* Set to false to disable micrometer listener timers.
1165+
* Set to false to disable micrometer listener timers. When true, ignored
1166+
* if {@link #setObservationEnabled(boolean)} is set to true.
11551167
* @param micrometerEnabled false to disable.
11561168
* @since 2.2
1169+
* @see #setObservationEnabled(boolean)
11571170
*/
11581171
public void setMicrometerEnabled(boolean micrometerEnabled) {
11591172
this.micrometerEnabled = micrometerEnabled;
11601173
}
11611174

1175+
/**
1176+
* Enable observation via micrometer; disables basic Micrometer timers enabled
1177+
* by {@link #setMicrometerEnabled(boolean)}.
1178+
* @param observationEnabled true to enable.
1179+
* @since 3.0
1180+
* @see #setMicrometerEnabled(boolean)
1181+
*/
1182+
public void setObservationEnabled(boolean observationEnabled) {
1183+
this.observationEnabled = observationEnabled;
1184+
}
1185+
1186+
/**
1187+
* Set an observation convention; used to add additional key/values to observations.
1188+
* @param observationConvention the convention.
1189+
* @since 3.0
1190+
*/
1191+
public void setObservationConvention(RabbitListenerObservationConvention observationConvention) {
1192+
this.observationConvention = observationConvention;
1193+
}
1194+
11621195
/**
11631196
* Get the consumeDelay - a time to wait before consuming in ms.
11641197
* @return the consume delay.
@@ -1230,7 +1263,7 @@ public void afterPropertiesSet() {
12301263
validateConfiguration();
12311264
initialize();
12321265
try {
1233-
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled
1266+
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled && !this.observationEnabled
12341267
&& this.applicationContext != null) {
12351268
String id = getListenerId();
12361269
if (id == null) {
@@ -1402,6 +1435,7 @@ public void start() {
14021435
}
14031436
}
14041437
}
1438+
obtainObservationRegistry(this.applicationContext);
14051439
try {
14061440
logger.debug("Starting Rabbit listener container.");
14071441
configureAdminIfNeeded();
@@ -1499,8 +1533,23 @@ protected void invokeErrorHandler(Throwable ex) {
14991533
* @see #invokeListener
15001534
* @see #handleListenerException
15011535
*/
1502-
@SuppressWarnings(UNCHECKED)
15031536
protected void executeListener(Channel channel, Object data) {
1537+
Observation observation;
1538+
ObservationRegistry registry = getObservationRegistry();
1539+
if (!this.observationEnabled || data instanceof List || registry == null) {
1540+
observation = Observation.NOOP;
1541+
}
1542+
else {
1543+
Message message = (Message) data;
1544+
observation = RabbitListenerObservation.LISTENER_OBSERVATION.observation(this.observationConvention,
1545+
DefaultRabbitListenerObservationConvention.INSTANCE,
1546+
new RabbitMessageReceiverContext(message, getListenerId()), registry);
1547+
}
1548+
observation.observe(() -> executeListenerAndHandleException(channel, data));
1549+
}
1550+
1551+
@SuppressWarnings(UNCHECKED)
1552+
protected void executeListenerAndHandleException(Channel channel, Object data) {
15041553
if (!isRunning()) {
15051554
if (logger.isWarnEnabled()) {
15061555
logger.warn(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.support.micrometer;
18+
19+
import io.micrometer.common.KeyValues;
20+
21+
/**
22+
* Default {@link RabbitListenerObservationConvention} for Rabbit listener key values.
23+
*
24+
* @author Gary Russell
25+
* @since 3.0
26+
*
27+
*/
28+
public class DefaultRabbitListenerObservationConvention implements RabbitListenerObservationConvention {
29+
30+
/**
31+
* A singleton instance of the convention.
32+
*/
33+
public static final DefaultRabbitListenerObservationConvention INSTANCE =
34+
new DefaultRabbitListenerObservationConvention();
35+
36+
@Override
37+
public KeyValues getLowCardinalityKeyValues(RabbitMessageReceiverContext context) {
38+
return KeyValues.of(RabbitListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(),
39+
context.getListenerId());
40+
}
41+
42+
@Override
43+
public String getContextualName(RabbitMessageReceiverContext context) {
44+
return context.getSource() + " receive";
45+
}
46+
47+
}

0 commit comments

Comments
 (0)