Skip to content

Commit a7d88b6

Browse files
committed
Add RabbitMQ properties to enable observations
Observations can be enabled for the simple, direct and stream listener and on the RabbitTemplate. Closes gh-36451
1 parent 93a2b1c commit a7d88b6

File tree

6 files changed

+60
-3
lines changed

6 files changed

+60
-3
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java

+1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ protected void configure(T factory, ConnectionFactory connectionFactory,
134134
if (this.taskExecutor != null) {
135135
factory.setTaskExecutor(this.taskExecutor);
136136
}
137+
factory.setObservationEnabled(configuration.isObservationEnabled());
137138
ListenerRetry retryConfig = configuration.getRetry();
138139
if (retryConfig.isEnabled()) {
139140
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java

+26
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,11 @@ public abstract static class BaseContainer {
723723
*/
724724
private boolean autoStartup = true;
725725

726+
/**
727+
* Whether to enable observation.
728+
*/
729+
private boolean observationEnabled;
730+
726731
public boolean isAutoStartup() {
727732
return this.autoStartup;
728733
}
@@ -731,6 +736,14 @@ public void setAutoStartup(boolean autoStartup) {
731736
this.autoStartup = autoStartup;
732737
}
733738

739+
public boolean isObservationEnabled() {
740+
return this.observationEnabled;
741+
}
742+
743+
public void setObservationEnabled(boolean observationEnabled) {
744+
this.observationEnabled = observationEnabled;
745+
}
746+
734747
}
735748

736749
public abstract static class AmqpContainer extends BaseContainer {
@@ -996,6 +1009,11 @@ public static class Template {
9961009
*/
9971010
private String defaultReceiveQueue;
9981011

1012+
/**
1013+
* Whether to enable observation.
1014+
*/
1015+
private boolean observationEnabled;
1016+
9991017
public Retry getRetry() {
10001018
return this.retry;
10011019
}
@@ -1048,6 +1066,14 @@ public void setDefaultReceiveQueue(String defaultReceiveQueue) {
10481066
this.defaultReceiveQueue = defaultReceiveQueue;
10491067
}
10501068

1069+
public boolean isObservationEnabled() {
1070+
return this.observationEnabled;
1071+
}
1072+
1073+
public void setObservationEnabled(boolean observationEnabled) {
1074+
this.observationEnabled = observationEnabled;
1075+
}
1076+
10511077
}
10521078

10531079
public static class Retry {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
2626
import org.springframework.amqp.support.converter.MessageConverter;
2727
import org.springframework.beans.factory.ObjectProvider;
28+
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.StreamContainer;
2829
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
2930
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
3031
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -57,7 +58,9 @@ StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Enviro
5758
ObjectProvider<ContainerCustomizer<StreamListenerContainer>> containerCustomizer) {
5859
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(
5960
rabbitStreamEnvironment);
60-
factory.setNativeListener(properties.getListener().getStream().isNativeListener());
61+
StreamContainer stream = properties.getListener().getStream();
62+
factory.setObservationEnabled(stream.isObservationEnabled());
63+
factory.setNativeListener(stream.isNativeListener());
6164
consumerCustomizer.ifUnique(factory::setConsumerCustomizer);
6265
containerCustomizer.ifUnique(factory::setContainerCustomizer);
6366
return factory;

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitTemplateConfigurer.java

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public void configure(RabbitTemplate template, ConnectionFactory connectionFacto
101101
map.from(templateProperties::getExchange).to(template::setExchange);
102102
map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
103103
map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
104+
map.from(templateProperties::isObservationEnabled).to(template::setObservationEnabled);
104105
}
105106

106107
private boolean determineMandatoryFlag() {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.rabbitmq.client.impl.CredentialsRefreshService;
3030
import com.rabbitmq.client.impl.DefaultCredentialsProvider;
3131
import org.aopalliance.aop.Advice;
32+
import org.assertj.core.api.InstanceOfAssertFactories;
3233
import org.junit.jupiter.api.Test;
3334
import org.junit.jupiter.api.condition.EnabledForJreRange;
3435
import org.junit.jupiter.api.condition.JRE;
@@ -371,6 +372,16 @@ void testRabbitTemplateExchangeAndRoutingKey() {
371372
});
372373
}
373374

375+
@Test
376+
void shouldConfigureObservationEnabledOnTemplate() {
377+
this.contextRunner.withUserConfiguration(TestConfiguration.class)
378+
.withPropertyValues("spring.rabbitmq.template.observation-enabled:true")
379+
.run((context) -> {
380+
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
381+
assertThat(rabbitTemplate).extracting("observationEnabled", InstanceOfAssertFactories.BOOLEAN).isTrue();
382+
});
383+
}
384+
374385
@Test
375386
void testRabbitTemplateDefaultReceiveQueue() {
376387
this.contextRunner.withUserConfiguration(TestConfiguration.class)
@@ -531,14 +542,16 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() {
531542
"spring.rabbitmq.listener.simple.idleEventInterval:5",
532543
"spring.rabbitmq.listener.simple.batchSize:20",
533544
"spring.rabbitmq.listener.simple.missingQueuesFatal:false",
534-
"spring.rabbitmq.listener.simple.force-stop:true")
545+
"spring.rabbitmq.listener.simple.force-stop:true",
546+
"spring.rabbitmq.listener.simple.observation-enabled:true")
535547
.run((context) -> {
536548
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
537549
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
538550
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("concurrentConsumers", 5);
539551
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("maxConcurrentConsumers", 10);
540552
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20);
541553
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", false);
554+
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("observationEnabled", true);
542555
checkCommonProps(context, rabbitListenerContainerFactory);
543556
});
544557
}
@@ -582,12 +595,14 @@ void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
582595
"spring.rabbitmq.listener.direct.defaultRequeueRejected:false",
583596
"spring.rabbitmq.listener.direct.idleEventInterval:5",
584597
"spring.rabbitmq.listener.direct.missingQueuesFatal:true",
585-
"spring.rabbitmq.listener.direct.force-stop:true")
598+
"spring.rabbitmq.listener.direct.force-stop:true",
599+
"spring.rabbitmq.listener.direct.observation-enabled:true")
586600
.run((context) -> {
587601
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = context
588602
.getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class);
589603
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("consumersPerQueue", 5);
590604
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", true);
605+
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("observationEnabled", true);
591606
checkCommonProps(context, rabbitListenerContainerFactory);
592607
});
593608
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java

+11
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* @author Gary Russell
5555
* @author Andy Wilkinson
5656
* @author Eddú Meléndez
57+
* @author Moritz Halbritter
5758
*/
5859
class RabbitStreamConfigurationTests {
5960

@@ -88,6 +89,16 @@ void whenNativeListenerIsEnabledThenContainerFactoryIsConfiguredToUseNativeListe
8889
.isTrue());
8990
}
9091

92+
@Test
93+
void shouldConfigureObservations() {
94+
this.contextRunner
95+
.withPropertyValues("spring.rabbitmq.listener.type:stream",
96+
"spring.rabbitmq.listener.stream.observation-enabled:true")
97+
.run((context) -> assertThat(context.getBean(StreamRabbitListenerContainerFactory.class))
98+
.extracting("observationEnabled", InstanceOfAssertFactories.BOOLEAN)
99+
.isTrue());
100+
}
101+
91102
@Test
92103
void environmentIsAutoConfiguredByDefault() {
93104
this.contextRunner.run((context) -> assertThat(context).hasSingleBean(Environment.class));

0 commit comments

Comments
 (0)