Skip to content

Commit 159d2b4

Browse files
committed
AMQP-828: AutoRecovery with template.receive()
JIRA: https://jira.spring.io/browse/AMQP-828 Close auto-recoving channels during recovery since the consumer is no longer there. This was previously fixed for the `BlockingQueueConsumer`, but not for `template.receive()` operations. * Polishing - PR Comments AMQP-828: Fix race, removing the shutdown listener The channel might close between the `isOpen()` test and removing the listener.
1 parent 0e597c9 commit 159d2b4

File tree

6 files changed

+129
-43
lines changed

6 files changed

+129
-43
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

+5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
import org.springframework.amqp.AmqpException;
4848
import org.springframework.amqp.AmqpTimeoutException;
49+
import org.springframework.amqp.rabbit.support.ClosingRecoveryListener;
4950
import org.springframework.amqp.rabbit.support.PublisherCallbackChannel;
5051
import org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl;
5152
import org.springframework.amqp.support.ConditionalExceptionLogger;
@@ -66,6 +67,7 @@
6667
import com.rabbitmq.client.Channel;
6768
import com.rabbitmq.client.ShutdownListener;
6869
import com.rabbitmq.client.ShutdownSignalException;
70+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
6971

7072
/**
7173
* A {@link ConnectionFactory} implementation that (when the cache mode is {@link CacheMode#CHANNEL} (default)
@@ -1175,6 +1177,9 @@ public void run() {
11751177
}
11761178
else {
11771179
this.target.close();
1180+
if (this.target instanceof AutorecoveringChannel) {
1181+
ClosingRecoveryListener.removeChannel((AutorecoveringChannel) this.target);
1182+
}
11781183
}
11791184
}
11801185
catch (AlreadyClosedException e) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

+3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
5757
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
5858
import org.springframework.amqp.rabbit.connection.RabbitUtils;
59+
import org.springframework.amqp.rabbit.support.ClosingRecoveryListener;
5960
import org.springframework.amqp.rabbit.support.CorrelationData;
6061
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
6162
import org.springframework.amqp.rabbit.support.ListenerContainerAware;
@@ -1294,6 +1295,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
12941295
pendingReply.reply(reply);
12951296
}
12961297
};
1298+
ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
12971299
channel.basicConsume(replyTo, true, consumerTag, true, true, null, consumer);
12981300
Message reply = null;
12991301
try {
@@ -1816,6 +1818,7 @@ public void handleConsumeOk(String consumerTag) {
18161818
}
18171819

18181820
};
1821+
ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
18191822
channel.basicConsume(queueName, consumer);
18201823
if (!latch.await(10, TimeUnit.SECONDS)) {
18211824
if (channel instanceof ChannelProxy) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

+3-40
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
5050
import org.springframework.amqp.rabbit.connection.RabbitUtils;
5151
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
52+
import org.springframework.amqp.rabbit.support.ClosingRecoveryListener;
5253
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
5354
import org.springframework.amqp.rabbit.support.Delivery;
5455
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
@@ -65,10 +66,7 @@
6566
import com.rabbitmq.client.Consumer;
6667
import com.rabbitmq.client.DefaultConsumer;
6768
import com.rabbitmq.client.Envelope;
68-
import com.rabbitmq.client.Recoverable;
69-
import com.rabbitmq.client.RecoveryListener;
7069
import com.rabbitmq.client.ShutdownSignalException;
71-
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
7270
import com.rabbitmq.utility.Utility;
7371

7472
/**
@@ -83,7 +81,7 @@
8381
* @author Alex Panchenko
8482
* @author Johno Crawford
8583
*/
86-
public class BlockingQueueConsumer implements RecoveryListener {
84+
public class BlockingQueueConsumer {
8785

8886
private static Log logger = LogFactory.getLog(BlockingQueueConsumer.class);
8987

@@ -578,7 +576,7 @@ public void start() throws AmqpException {
578576
this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
579577
this.transactional);
580578
this.channel = this.resourceHolder.getChannel();
581-
addRecoveryListener();
579+
ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel);
582580
}
583581
catch (AmqpAuthenticationException e) {
584582
throw new FatalListenerStartupException("Authentication failure", e);
@@ -662,19 +660,6 @@ else if (e.getFailedQueues().size() < this.queues.length) {
662660
}
663661
}
664662

665-
/**
666-
* Add a listener if necessary so we can immediately close an autorecovered
667-
* channel if necessary since the async consumer will no longer exist.
668-
*/
669-
private void addRecoveryListener() {
670-
if (this.channel instanceof ChannelProxy) {
671-
if (((ChannelProxy) this.channel).getTargetChannel() instanceof AutorecoveringChannel) {
672-
((AutorecoveringChannel) ((ChannelProxy) this.channel).getTargetChannel())
673-
.addRecoveryListener(this);
674-
}
675-
}
676-
}
677-
678663
private void consumeFromQueue(String queue) throws IOException {
679664
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
680665
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
@@ -849,28 +834,6 @@ public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
849834

850835
}
851836

852-
@Override
853-
public void handleRecovery(Recoverable recoverable) {
854-
// should never get here
855-
handleRecoveryStarted(recoverable);
856-
}
857-
858-
@Override
859-
public void handleRecoveryStarted(Recoverable recoverable) {
860-
if (logger.isDebugEnabled()) {
861-
logger.debug("Closing an autorecovered channel: " + recoverable);
862-
}
863-
try {
864-
((Channel) recoverable).close();
865-
}
866-
catch (IOException e) {
867-
logger.debug("Error closing an autorecovered channel");
868-
}
869-
catch (TimeoutException e) {
870-
logger.debug("Error closing an autorecovered channel");
871-
}
872-
}
873-
874837
@Override
875838
public String toString() {
876839
return "Consumer@" + ObjectUtils.getIdentityHexString(this) + ": "
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.support;
18+
19+
import java.io.IOException;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.ConcurrentMap;
22+
import java.util.concurrent.TimeoutException;
23+
24+
import org.apache.commons.logging.Log;
25+
import org.apache.commons.logging.LogFactory;
26+
27+
import org.springframework.amqp.rabbit.connection.ChannelProxy;
28+
29+
import com.rabbitmq.client.Channel;
30+
import com.rabbitmq.client.Recoverable;
31+
import com.rabbitmq.client.RecoveryListener;
32+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
33+
34+
/**
35+
* A {@link RecoveryListener} that closes the recovered channel, to avoid
36+
* orphaned consumers.
37+
*
38+
* @author Gary Russell
39+
* @since 1.7.10
40+
*
41+
*/
42+
public final class ClosingRecoveryListener implements RecoveryListener {
43+
44+
private static final Log logger = LogFactory.getLog(ClosingRecoveryListener.class);
45+
46+
private static final RecoveryListener INSTANCE = new ClosingRecoveryListener();
47+
48+
private static final ConcurrentMap<AutorecoveringChannel, Boolean> hasListener =
49+
new ConcurrentHashMap<AutorecoveringChannel, Boolean>();
50+
51+
private ClosingRecoveryListener() {
52+
super();
53+
}
54+
55+
@Override
56+
public void handleRecovery(Recoverable recoverable) {
57+
// should never get here
58+
handleRecoveryStarted(recoverable);
59+
}
60+
61+
@Override
62+
public void handleRecoveryStarted(Recoverable recoverable) {
63+
if (logger.isDebugEnabled()) {
64+
logger.debug("Closing an autorecovered channel: " + recoverable);
65+
}
66+
try {
67+
((Channel) recoverable).close();
68+
}
69+
catch (TimeoutException e) {
70+
logger.error("Error closing an autorecovered channel", e);
71+
}
72+
catch (IOException e) {
73+
logger.error("Error closing an autorecovered channel", e);
74+
}
75+
finally {
76+
hasListener.remove(recoverable);
77+
}
78+
}
79+
80+
/**
81+
* Add a listener if necessary so we can immediately close an autorecovered
82+
* channel if necessary since the actual consumer will no longer exist.
83+
* Idempotent operation.
84+
* @param channel the channel.
85+
*/
86+
public static void addRecoveryListenerIfNecessary(Channel channel) {
87+
AutorecoveringChannel autorecoveringChannel = null;
88+
if (channel instanceof ChannelProxy) {
89+
if (((ChannelProxy) channel).getTargetChannel() instanceof AutorecoveringChannel) {
90+
autorecoveringChannel = (AutorecoveringChannel) ((ChannelProxy) channel)
91+
.getTargetChannel();
92+
}
93+
}
94+
else if (channel instanceof AutorecoveringChannel) {
95+
autorecoveringChannel = (AutorecoveringChannel) channel;
96+
}
97+
if (autorecoveringChannel != null
98+
&& hasListener.putIfAbsent(autorecoveringChannel, Boolean.TRUE) == null) {
99+
autorecoveringChannel.addRecoveryListener(INSTANCE);
100+
}
101+
}
102+
103+
/**
104+
* Remove the channel from the set used to ensure that
105+
* {@link #addRecoveryListenerIfNecessary(Channel)} is idempotent.
106+
* @param channel the channel to remove.
107+
*/
108+
public static void removeChannel(AutorecoveringChannel channel) {
109+
hasListener.remove(channel);
110+
}
111+
112+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/PublisherCallbackChannelImpl.java

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import com.rabbitmq.client.ReturnListener;
6161
import com.rabbitmq.client.ShutdownListener;
6262
import com.rabbitmq.client.ShutdownSignalException;
63+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
6364

6465
/**
6566
* Channel wrapper to allow a single listener able to handle
@@ -130,6 +131,9 @@ public Connection getConnection() {
130131
@Override
131132
public void close(int closeCode, String closeMessage) throws IOException, TimeoutException {
132133
this.delegate.close(closeCode, closeMessage);
134+
if (this.delegate instanceof AutorecoveringChannel) {
135+
ClosingRecoveryListener.removeChannel((AutorecoveringChannel) this.delegate);
136+
}
133137
}
134138

135139
/**

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.amqp.rabbit.listener;
1818

19-
import static org.hamcrest.Matchers.contains;
2019
import static org.hamcrest.Matchers.containsString;
2120
import static org.hamcrest.Matchers.equalTo;
2221
import static org.hamcrest.Matchers.instanceOf;
@@ -409,11 +408,11 @@ public void publishEvent(Object event) {
409408
container2.stop();
410409
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
411410
verify(logger, atLeastOnce()).info(captor.capture());
412-
assertThat(captor.getAllValues(), contains(containsString("exclusive")));
411+
assertThat(captor.getAllValues().toString(), containsString("exclusive"));
413412
assertEquals("Consumer raised exception, attempting restart", eventRef.get().getReason());
414413
assertFalse(eventRef.get().isFatal());
415414
assertThat(eventRef.get().getThrowable(), instanceOf(AmqpIOException.class));
416-
verify(containerLogger).warn(any());
415+
verify(containerLogger, atLeastOnce()).warn(any());
417416
}
418417

419418
@Test

0 commit comments

Comments
 (0)