Skip to content

Commit c8f4cb5

Browse files
committed
spring-projectsGH-2536: Exclusive Consumer Logging Improvements
Resolves spring-projects#2536 Log messages due to access refused due to exclusive consumers at DEBUG level instead of WARN and INFO.
1 parent f6d46f6 commit c8f4cb5

File tree

9 files changed

+88
-59
lines changed

9 files changed

+88
-59
lines changed

spring-amqp/src/main/java/org/springframework/amqp/support/ConditionalExceptionLogger.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.amqp.support;
1818

19+
import java.util.function.Supplier;
20+
1921
import org.apache.commons.logging.Log;
2022

2123
/**
@@ -37,4 +39,16 @@ public interface ConditionalExceptionLogger {
3739
*/
3840
void log(Log logger, String message, Throwable t);
3941

42+
/**
43+
* Log a consumer restart; debug by default.
44+
* @param logger the logger.
45+
* @param message the message.
46+
* @since 3.1
47+
*/
48+
default void logRestart(Log logger, Supplier<String> message) {
49+
if (logger.isDebugEnabled()) {
50+
logger.debug(message.get());
51+
}
52+
}
53+
4054
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ public void setConnectionNameStrategy(ConnectionNameStrategy connectionNameStrat
496496
* Set the strategy for logging close exceptions; by default, if a channel is closed due to a failed
497497
* passive queue declaration, it is logged at debug level. Normal channel closes (200 OK) are not
498498
* logged. All others are logged at ERROR level (unless access is refused due to an exclusive consumer
499-
* condition, in which case, it is logged at INFO level).
499+
* condition, in which case, it is logged at DEBUG level, since 3.1, previously INFO).
500500
* @param closeExceptionLogger the {@link ConditionalExceptionLogger}.
501501
* @since 1.5
502502
*/
@@ -720,10 +720,7 @@ public void handleUnblocked() {
720720
* close exceptions.
721721
* @since 1.5
722722
*/
723-
private static class DefaultChannelCloseLogger implements ConditionalExceptionLogger {
724-
725-
DefaultChannelCloseLogger() {
726-
}
723+
public static class DefaultChannelCloseLogger implements ConditionalExceptionLogger {
727724

728725
@Override
729726
public void log(Log logger, String message, Throwable t) {
@@ -734,8 +731,8 @@ public void log(Log logger, String message, Throwable t) {
734731
}
735732
}
736733
else if (RabbitUtils.isExclusiveUseChannelClose(cause)) {
737-
if (logger.isInfoEnabled()) {
738-
logger.info(message + ": " + cause.getMessage());
734+
if (logger.isDebugEnabled()) {
735+
logger.debug(message + ": " + cause.getMessage());
739736
}
740737
}
741738
else if (!RabbitUtils.isNormalChannelClose(cause)) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -409,4 +409,18 @@ public static SaslConfig stringToSaslConfig(String saslConfig,
409409
}
410410
}
411411

412+
/**
413+
* Determine whether the exception is due to an access refused for an exclusive consumer.
414+
* @param exception the exception.
415+
* @return true if access refused.
416+
* @since 3.1
417+
*/
418+
public static boolean exclusiveAccesssRefused(Exception exception) {
419+
return exception.getCause() instanceof IOException
420+
&& exception.getCause().getCause() instanceof ShutdownSignalException sse1
421+
&& isExclusiveUseChannelClose(sse1)
422+
|| exception.getCause() instanceof ShutdownSignalException sse2
423+
&& isExclusiveUseChannelClose(sse2);
424+
}
425+
412426
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

+5-21
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@
9191
import org.springframework.util.backoff.FixedBackOff;
9292

9393
import com.rabbitmq.client.Channel;
94-
import com.rabbitmq.client.ShutdownSignalException;
9594
import io.micrometer.observation.Observation;
9695
import io.micrometer.observation.ObservationRegistry;
9796

@@ -1030,7 +1029,7 @@ public void setStatefulRetryFatalWithNullMessageId(boolean statefulRetryFatalWit
10301029

10311030
/**
10321031
* Set a {@link ConditionalExceptionLogger} for logging exclusive consumer failures. The
1033-
* default is to log such failures at WARN level.
1032+
* default is to log such failures at DEBUG level (since 3.1, previously WARN).
10341033
* @param exclusiveConsumerExceptionLogger the conditional exception logger.
10351034
* @since 1.5
10361035
*/
@@ -2095,27 +2094,12 @@ protected WrappedTransactionException(Throwable cause) {
20952094
* consumer failures.
20962095
* @since 1.5
20972096
*/
2098-
private static class DefaultExclusiveConsumerLogger implements ConditionalExceptionLogger {
2099-
2100-
DefaultExclusiveConsumerLogger() {
2101-
}
2097+
public static class DefaultExclusiveConsumerLogger implements ConditionalExceptionLogger {
21022098

21032099
@Override
2104-
public void log(Log logger, String message, Throwable t) {
2105-
if (t instanceof ShutdownSignalException cause) {
2106-
if (RabbitUtils.isExclusiveUseChannelClose(cause)) {
2107-
if (logger.isWarnEnabled()) {
2108-
logger.warn(message + ": " + cause.toString());
2109-
}
2110-
}
2111-
else if (!RabbitUtils.isNormalChannelClose(cause)) {
2112-
logger.error(message + ": " + cause.getMessage());
2113-
}
2114-
}
2115-
else {
2116-
if (logger.isErrorEnabled()) {
2117-
logger.error("Unexpected invocation of " + getClass() + ", with message: " + message, t);
2118-
}
2100+
public void log(Log logger, String message, Throwable cause) {
2101+
if (logger.isDebugEnabled()) {
2102+
logger.debug(message + ": " + cause.toString());
21192103
}
21202104
}
21212105

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

+9-11
Original file line numberDiff line numberDiff line change
@@ -782,24 +782,22 @@ private SimpleConsumer consume(String queue, int index, Connection connection) {
782782

783783
@Nullable
784784
private SimpleConsumer handleConsumeException(String queue, int index, @Nullable SimpleConsumer consumerArg,
785-
Exception e) {
785+
Exception ex) {
786786

787787
SimpleConsumer consumer = consumerArg;
788-
if (e.getCause() instanceof ShutdownSignalException
789-
&& e.getCause().getMessage().contains("in exclusive use")) {
790-
getExclusiveConsumerExceptionLogger().log(logger,
791-
"Exclusive consumer failure", e.getCause());
792-
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
793-
}
794-
else if (e.getCause() instanceof ShutdownSignalException
795-
&& RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) e.getCause())) {
788+
if (RabbitUtils.exclusiveAccesssRefused(ex)) {
789+
getExclusiveConsumerExceptionLogger().log(logger, "Exclusive consumer failure", ex.getCause());
790+
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, ex);
791+
}
792+
else if (ex.getCause() instanceof ShutdownSignalException
793+
&& RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) ex.getCause())) {
796794
publishMissingQueueEvent(queue);
797795
this.logger.error("Queue not present, scheduling consumer "
798-
+ (consumer == null ? "for queue " + queue : consumer) + " for restart", e);
796+
+ (consumer == null ? "for queue " + queue : consumer) + " for restart", ex);
799797
}
800798
else if (this.logger.isWarnEnabled()) {
801799
this.logger.warn("basicConsume failed, scheduling consumer "
802-
+ (consumer == null ? "for queue " + queue : consumer) + " for restart", e);
800+
+ (consumer == null ? "for queue " + queue : consumer) + " for restart", ex);
803801
}
804802

805803
if (consumer == null) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.amqp.rabbit.listener;
1818

19-
import java.io.IOException;
2019
import java.util.ArrayList;
2120
import java.util.Arrays;
2221
import java.util.Collection;
@@ -1165,6 +1164,8 @@ private final class AsyncMessageProcessingConsumer implements Runnable {
11651164

11661165
private int consecutiveMessages;
11671166

1167+
private boolean failedExclusive;
1168+
11681169

11691170
AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
11701171
this.consumer = consumer;
@@ -1276,8 +1277,8 @@ public void run() { // NOSONAR - line count
12761277
}
12771278
}
12781279
catch (AmqpIOException e) {
1279-
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
1280-
&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
1280+
if (RabbitUtils.exclusiveAccesssRefused(e)) {
1281+
this.failedExclusive = true;
12811282
getExclusiveConsumerExceptionLogger().log(logger,
12821283
"Exclusive consumer failure", e.getCause().getCause());
12831284
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
@@ -1460,7 +1461,12 @@ private void killOrRestart(boolean aborted) {
14601461
}
14611462
}
14621463
else {
1463-
logger.info("Restarting " + this.consumer);
1464+
if (this.failedExclusive) {
1465+
getExclusiveConsumerExceptionLogger().logRestart(logger, () -> "Restarting " + this.consumer);
1466+
}
1467+
else {
1468+
logger.info("Restarting " + this.consumer);
1469+
}
14641470
restart(this.consumer);
14651471
}
14661472
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.awaitility.Awaitility.await;
2121
import static org.awaitility.Awaitility.with;
22-
import static org.mockito.ArgumentMatchers.any;
2322
import static org.mockito.ArgumentMatchers.anyBoolean;
2423
import static org.mockito.BDDMockito.given;
2524
import static org.mockito.BDDMockito.willAnswer;
@@ -347,7 +346,7 @@ public void testListenFromAnonQueue() throws Exception {
347346
@Test
348347
public void testExclusive() throws Exception {
349348
Log logger = spy(TestUtils.getPropertyValue(this.template.getConnectionFactory(), "logger", Log.class));
350-
willReturn(true).given(logger).isInfoEnabled();
349+
willReturn(true).given(logger).isDebugEnabled();
351350
new DirectFieldAccessor(this.template.getConnectionFactory()).setPropertyValue("logger", logger);
352351
CountDownLatch latch1 = new CountDownLatch(1000);
353352
SimpleMessageListenerContainer container1 =
@@ -388,7 +387,7 @@ else if (event instanceof ConsumeOkEvent) {
388387
});
389388
container2.afterPropertiesSet();
390389
Log containerLogger = spy(TestUtils.getPropertyValue(container2, "logger", Log.class));
391-
willReturn(true).given(containerLogger).isWarnEnabled();
390+
willReturn(true).given(containerLogger).isDebugEnabled();
392391
new DirectFieldAccessor(container2).setPropertyValue("logger", containerLogger);
393392
container2.start();
394393
for (int i = 0; i < 1000; i++) {
@@ -404,13 +403,15 @@ else if (event instanceof ConsumeOkEvent) {
404403
}
405404
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
406405
container2.stop();
407-
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
408-
verify(logger, atLeastOnce()).info(captor.capture());
409-
assertThat(captor.getAllValues()).anyMatch(arg -> arg.contains("exclusive"));
406+
ArgumentCaptor<String> connLogCaptor = ArgumentCaptor.forClass(String.class);
407+
verify(logger, atLeastOnce()).debug(connLogCaptor.capture());
408+
assertThat(connLogCaptor.getAllValues()).anyMatch(arg -> arg.contains("exclusive"));
410409
assertThat(eventRef.get().getReason()).isEqualTo("Consumer raised exception, attempting restart");
411410
assertThat(eventRef.get().isFatal()).isFalse();
412411
assertThat(eventRef.get().getThrowable()).isInstanceOf(AmqpIOException.class);
413-
verify(containerLogger, atLeastOnce()).warn(any());
412+
ArgumentCaptor<String> contLogCaptor = ArgumentCaptor.forClass(String.class);
413+
verify(logger, atLeastOnce()).debug(contLogCaptor.capture());
414+
assertThat(contLogCaptor.getAllValues()).anyMatch(arg -> arg.contains("exclusive"));
414415
}
415416

416417
@Test

src/reference/asciidoc/amqp.adoc

+12-5
Original file line numberDiff line numberDiff line change
@@ -940,17 +940,19 @@ See <<publishing-is-async>> for one scenario where you might want to register a
940940

941941
Version 1.5 introduced a mechanism to enable users to control logging levels.
942942

943-
The `CachingConnectionFactory` uses a default strategy to log channel closures as follows:
943+
The `AbstractConnectionFactory` uses a default strategy to log channel closures as follows:
944944

945945
* Normal channel closes (200 OK) are not logged.
946-
* If a channel is closed due to a failed passive queue declaration, it is logged at debug level.
946+
* If a channel is closed due to a failed passive queue declaration, it is logged at DEBUG level.
947947
* If a channel is closed because the `basic.consume` is refused due to an exclusive consumer condition, it is logged at
948-
INFO level.
948+
DEBUG level (since 3.1, previously INFO).
949949
* All others are logged at ERROR level.
950950

951951
To modify this behavior, you can inject a custom `ConditionalExceptionLogger` into the
952952
`CachingConnectionFactory` in its `closeExceptionLogger` property.
953953

954+
Also, the `AbstractConnectionFactory.DefaultChannelCloseLogger` is now public, allowing it to be sub classed.
955+
954956
See also <<consumer-events>>.
955957

956958
[[runtime-cache-properties]]
@@ -2364,8 +2366,13 @@ These events can be consumed by implementing `ApplicationListener<ListenerContai
23642366

23652367
NOTE: System-wide events (such as connection failures) are published by all consumers when `concurrentConsumers` is greater than 1.
23662368

2367-
If a consumer fails because one if its queues is being used exclusively, by default, as well as publishing the event, a `WARN` log is issued.
2368-
To change this logging behavior, provide a custom `ConditionalExceptionLogger` in the `SimpleMessageListenerContainer` instance's `exclusiveConsumerExceptionLogger` property.
2369+
If a consumer fails because one if its queues is being used exclusively, by default, as well as publishing the event, a `DEBUG` log is issued (since 3.1, previously WARN).
2370+
To change this logging behavior, provide a custom `ConditionalExceptionLogger` in the `AbstractMessageListenerContainer` instance's `exclusiveConsumerExceptionLogger` property.
2371+
In addition, the `SimpleMessageListenerContainer` consumer restart after such an exception is now logged at DEBUG level by default (previously INFO).
2372+
A new method `logRestart()` has been added to the `ConditionalExceptionLogger` to allow this to be changed.
2373+
2374+
Also, the `AbstractMessageListenerContainer.DefaultExclusiveConsumerLogger` is now public, allowing it to be sub classed.
2375+
23692376
See also <<channel-close-logging>>.
23702377

23712378
Fatal errors are always logged at the `ERROR` level.

src/reference/asciidoc/whats-new.adoc

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
[[whats-new]]
22
== What's New
33

4-
=== Changes in 3.0 Since 2.4
4+
=== Changes in 3.1 Since 3.0
55

6-
==== Java 17, Spring Framework 6.0
6+
==== Java 17, Spring Framework 6.1
77

88
This version requires Spring Framework 6.1 and Java 17.
99

10+
[[31-exc]]
11+
==== Exclusive Consumer Logging
12+
13+
Log messages reporting access refusal due to exclusive consumers are now logged at DEBUG level by default.
14+
It remains possible to configure your own logging behavior by setting the `exclusiveConsumerExceptionLogger` and `closeExceptionLogger` properties on the listener container and connection factory respectively.
15+
In addition, the `SimpleMessageListenerContainer` consumer restart after such an exception is now logged at DEBUG level by default (previously INFO).
16+
A new method `logRestart()` has been added to the `ConditionalExceptionLogger` to allow this to be changed.
17+
See <<consumer-events>> and <<channel-close-logging>> for more information.

0 commit comments

Comments
 (0)