Skip to content

Commit 18f70d5

Browse files
garyrussellartembilan
authored andcommitted
AMQP-829: RabbitTemplate Req/Resp Fail Fast
JIRA: https://jira.spring.io/browse/AMQP-829 If the channel closes while we are waiting for a reply, throw an exception instead of waiting to time out. * Polishing - PR Comments
1 parent 1ed2b12 commit 18f70d5

File tree

2 files changed

+81
-22
lines changed

2 files changed

+81
-22
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

+31-22
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import java.util.Map;
2626
import java.util.Set;
2727
import java.util.UUID;
28-
import java.util.concurrent.ArrayBlockingQueue;
29-
import java.util.concurrent.BlockingQueue;
3028
import java.util.concurrent.CompletableFuture;
3129
import java.util.concurrent.ConcurrentHashMap;
3230
import java.util.concurrent.ConcurrentMap;
@@ -1601,6 +1599,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
16011599

16021600
};
16031601
ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
1602+
ShutdownListener shutdownListener = c -> pendingReply.completeExceptionally(c);
1603+
channel.addShutdownListener(shutdownListener);
16041604
channel.basicConsume(replyTo, true, consumerTag, this.noLocalReplyConsumer, true, null, consumer);
16051605
Message reply = null;
16061606
try {
@@ -1612,20 +1612,26 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
16121612
if (channel.isOpen()) {
16131613
cancelConsumerQuietly(channel, consumer);
16141614
}
1615+
try {
1616+
channel.removeShutdownListener(shutdownListener);
1617+
}
1618+
catch (Exception e) {
1619+
// NOSONAR - channel might have closed.
1620+
}
16151621
}
16161622
return reply;
16171623
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
16181624
}
16191625

16201626
private void cancelConsumerQuietly(Channel channel, DefaultConsumer consumer) {
16211627
try {
1622-
channel.basicCancel(consumer.getConsumerTag());
1623-
}
1624-
catch (Exception e) {
1628+
channel.basicCancel(consumer.getConsumerTag());
1629+
}
1630+
catch (Exception e) {
16251631
if (this.logger.isDebugEnabled()) {
16261632
this.logger.debug("Failed to cancel consumer: " + consumer, e);
16271633
}
1628-
}
1634+
}
16291635
}
16301636

16311637
protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message,
@@ -2331,7 +2337,7 @@ private static class PendingReply {
23312337

23322338
private volatile String savedCorrelation;
23332339

2334-
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
2340+
private final CompletableFuture<Message> future = new CompletableFuture<>();
23352341

23362342
public String getSavedReplyTo() {
23372343
return this.savedReplyTo;
@@ -2350,33 +2356,36 @@ public void setSavedCorrelation(String savedCorrelation) {
23502356
}
23512357

23522358
public Message get() throws InterruptedException {
2353-
Object reply = this.queue.take();
2354-
return processReply(reply);
2359+
try {
2360+
return this.future.get();
2361+
}
2362+
catch (ExecutionException e) {
2363+
throw RabbitExceptionTranslator.convertRabbitAccessException(e.getCause());
2364+
}
23552365
}
23562366

23572367
public Message get(long timeout, TimeUnit unit) throws InterruptedException {
2358-
Object reply = this.queue.poll(timeout, unit);
2359-
return reply == null ? null : processReply(reply);
2360-
}
2361-
2362-
private Message processReply(Object reply) {
2363-
if (reply instanceof Message) {
2364-
return (Message) reply;
2368+
try {
2369+
return this.future.get(timeout, unit);
23652370
}
2366-
else if (reply instanceof AmqpException) {
2367-
throw (AmqpException) reply;
2371+
catch (ExecutionException e) {
2372+
throw RabbitExceptionTranslator.convertRabbitAccessException(e.getCause());
23682373
}
2369-
else {
2370-
throw new AmqpException("Unexpected reply type " + reply.getClass().getName());
2374+
catch (TimeoutException e) {
2375+
return null;
23712376
}
23722377
}
23732378

23742379
public void reply(Message reply) {
2375-
this.queue.add(reply);
2380+
this.future.complete(reply);
23762381
}
23772382

23782383
public void returned(AmqpMessageReturnedException e) {
2379-
this.queue.add(e);
2384+
completeExceptionally(e);
2385+
}
2386+
2387+
public void completeExceptionally(Throwable t) {
2388+
this.future.completeExceptionally(t);
23802389
}
23812390

23822391
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

+50
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import static org.hamcrest.Matchers.containsString;
2020
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.instanceOf;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertSame;
2324
import static org.junit.Assert.assertThat;
2425
import static org.junit.Assert.assertTrue;
26+
import static org.junit.Assert.fail;
2527
import static org.mockito.ArgumentMatchers.any;
2628
import static org.mockito.ArgumentMatchers.anyBoolean;
2729
import static org.mockito.ArgumentMatchers.anyString;
@@ -36,7 +38,10 @@
3638
import java.util.Collections;
3739
import java.util.HashMap;
3840
import java.util.Map;
41+
import java.util.concurrent.CountDownLatch;
3942
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.Executors;
44+
import java.util.concurrent.TimeUnit;
4045
import java.util.concurrent.atomic.AtomicBoolean;
4146
import java.util.concurrent.atomic.AtomicInteger;
4247
import java.util.concurrent.atomic.AtomicReference;
@@ -47,6 +52,7 @@
4752
import org.mockito.Mockito;
4853

4954
import org.springframework.amqp.AmqpAuthenticationException;
55+
import org.springframework.amqp.AmqpException;
5056
import org.springframework.amqp.core.Address;
5157
import org.springframework.amqp.core.Message;
5258
import org.springframework.amqp.core.MessageProperties;
@@ -77,6 +83,8 @@
7783
import com.rabbitmq.client.ConnectionFactory;
7884
import com.rabbitmq.client.Consumer;
7985
import com.rabbitmq.client.Envelope;
86+
import com.rabbitmq.client.ShutdownListener;
87+
import com.rabbitmq.client.ShutdownSignalException;
8088
import com.rabbitmq.client.impl.AMQImpl;
8189
import com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk;
8290

@@ -365,6 +373,48 @@ public void testNestedTxBinding() throws Exception {
365373
verify(channel1).txCommit();
366374
}
367375

376+
@Test
377+
public void testShutdownWhileWaitingForReply() throws Exception {
378+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
379+
Connection mockConnection = mock(Connection.class);
380+
Channel mockChannel = mock(Channel.class);
381+
382+
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
383+
given(mockConnection.isOpen()).willReturn(true);
384+
given(mockConnection.createChannel()).willReturn(mockChannel);
385+
given(mockChannel.queueDeclare()).willReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
386+
final AtomicReference<ShutdownListener> listener = new AtomicReference<>();
387+
final CountDownLatch shutdownLatch = new CountDownLatch(1);
388+
willAnswer(invocation -> {
389+
listener.set(invocation.getArgument(0));
390+
shutdownLatch.countDown();
391+
return null;
392+
}).given(mockChannel).addShutdownListener(any());
393+
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
394+
connectionFactory.setExecutor(mock(ExecutorService.class));
395+
RabbitTemplate template = new RabbitTemplate(connectionFactory);
396+
template.setReplyTimeout(60_000);
397+
Message input = new Message("Hello, world!".getBytes(), new MessageProperties());
398+
ExecutorService exec = Executors.newSingleThreadExecutor();
399+
exec.execute(() -> {
400+
try {
401+
shutdownLatch.await(10, TimeUnit.SECONDS);
402+
}
403+
catch (InterruptedException e) {
404+
Thread.currentThread().interrupt();
405+
}
406+
listener.get().shutdownCompleted(new ShutdownSignalException(true, false, null, null));
407+
});
408+
try {
409+
template.doSendAndReceiveWithTemporary("foo", "bar", input, null);
410+
fail("Expected exception");
411+
}
412+
catch (AmqpException e) {
413+
assertThat(e.getCause(), instanceOf(ShutdownSignalException.class));
414+
}
415+
exec.shutdownNow();
416+
}
417+
368418
@SuppressWarnings("serial")
369419
private class TestTransactionManager extends AbstractPlatformTransactionManager {
370420

0 commit comments

Comments
 (0)