Skip to content

Commit 14cacf1

Browse files
committed
GH-999: Sonar fix and improve tests
- verify permit is released after the deferred close
1 parent 3bcca76 commit 14cacf1

File tree

2 files changed

+63
-18
lines changed

2 files changed

+63
-18
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@
102102
public class CachingConnectionFactory extends AbstractConnectionFactory
103103
implements InitializingBean, ShutdownListener {
104104

105+
private static final String UNUSED = "unused";
106+
105107
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
106108

107109
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
@@ -845,7 +847,7 @@ public final void destroy() {
845847
this.logger.warn("Channel executor failed to shut down");
846848
}
847849
}
848-
catch (@SuppressWarnings("unused") InterruptedException e) {
850+
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
849851
Thread.currentThread().interrupt();
850852
}
851853
}
@@ -1250,7 +1252,7 @@ private void doReturnToCache(Channel proxy) {
12501252
try {
12511253
physicalClose(proxy);
12521254
}
1253-
catch (@SuppressWarnings("unused") Exception e) {
1255+
catch (@SuppressWarnings(UNUSED) Exception e) {
12541256
}
12551257
}
12561258
}
@@ -1319,20 +1321,20 @@ private void asyncClose(Object proxy) {
13191321
Thread.sleep(ASYNC_CLOSE_TIMEOUT);
13201322
}
13211323
}
1322-
catch (@SuppressWarnings("unused") InterruptedException e1) {
1324+
catch (@SuppressWarnings(UNUSED) InterruptedException e1) {
13231325
Thread.currentThread().interrupt();
13241326
}
1325-
catch (@SuppressWarnings("unused") Exception e2) {
1327+
catch (@SuppressWarnings(UNUSED) Exception e2) {
13261328
}
13271329
finally {
13281330
try {
13291331
channel.close();
13301332
}
1331-
catch (@SuppressWarnings("unused") IOException e3) {
1333+
catch (@SuppressWarnings(UNUSED) IOException e3) {
13321334
}
1333-
catch (@SuppressWarnings("unused") AlreadyClosedException e4) {
1335+
catch (@SuppressWarnings(UNUSED) AlreadyClosedException e4) {
13341336
}
1335-
catch (@SuppressWarnings("unused") TimeoutException e5) {
1337+
catch (@SuppressWarnings(UNUSED) TimeoutException e5) {
13361338
}
13371339
catch (ShutdownSignalException e6) {
13381340
if (!RabbitUtils.isNormalShutdown(e6)) {
@@ -1346,7 +1348,7 @@ private void asyncClose(Object proxy) {
13461348
}
13471349
});
13481350
}
1349-
catch (@SuppressWarnings("unused") RuntimeException e) {
1351+
catch (@SuppressWarnings(UNUSED) RuntimeException e) {
13501352
CachingConnectionFactory.this.inFlightAsyncCloses.release(channel);
13511353
}
13521354
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777

7878
import com.rabbitmq.client.Address;
7979
import com.rabbitmq.client.Channel;
80+
import com.rabbitmq.client.ConfirmListener;
8081
import com.rabbitmq.client.ConnectionFactory;
8182
import com.rabbitmq.client.GetResponse;
8283
import com.rabbitmq.client.ShutdownSignalException;
@@ -590,35 +591,77 @@ public void testCheckoutLimitWithPublisherConfirmsPhysical() throws IOException,
590591
testCheckoutLimitWithPublisherConfirms(true);
591592
}
592593

593-
public void testCheckoutLimitWithPublisherConfirms(boolean physicalClose) throws IOException, Exception {
594+
private void testCheckoutLimitWithPublisherConfirms(boolean physicalClose) throws IOException, Exception {
594595
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
595596
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
596-
Channel mockChannel1 = mock(Channel.class);
597+
Channel mockChannel = mock(Channel.class);
597598

598599
when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection);
599-
when(mockConnection.createChannel()).thenReturn(mockChannel1);
600+
when(mockConnection.createChannel()).thenReturn(mockChannel);
600601
when(mockConnection.isOpen()).thenReturn(true);
601602

602603
// Called during physical close
603-
when(mockChannel1.isOpen()).thenReturn(true);
604+
when(mockChannel.isOpen()).thenReturn(true);
605+
CountDownLatch confirmsLatch = new CountDownLatch(1);
606+
doAnswer(invoc -> {
607+
confirmsLatch.await(10, TimeUnit.SECONDS);
608+
return null;
609+
}).when(mockChannel).waitForConfirmsOrDie(anyLong());
610+
AtomicReference<ConfirmListener> confirmListener = new AtomicReference<>();
611+
doAnswer(invoc -> {
612+
confirmListener.set(invoc.getArgument(0));
613+
return null;
614+
}).when(mockChannel).addConfirmListener(any());
615+
when(mockChannel.getNextPublishSeqNo()).thenReturn(1L);
604616

605617
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
606-
ccf.setExecutor(mock(ExecutorService.class));
618+
ExecutorService exec = Executors.newCachedThreadPool();
619+
ccf.setExecutor(exec);
607620
ccf.setChannelCacheSize(1);
608621
ccf.setChannelCheckoutTimeout(1);
609622
ccf.setPublisherConfirms(true);
610623

611624
final Connection con = ccf.createConnection();
612625

626+
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
613627
if (physicalClose) {
614-
Channel channel = con.createChannel(false);
615-
RabbitUtils.setPhysicalCloseRequired(channel, physicalClose);
616-
channel.close();
628+
Channel channel1 = con.createChannel(false);
629+
RabbitUtils.setPhysicalCloseRequired(channel1, physicalClose);
630+
channel1.close();
617631
}
618632
else {
619-
new RabbitTemplate(ccf).convertAndSend("foo", "bar"); // pending confirm
633+
rabbitTemplate.convertAndSend("foo", "bar"); // pending confirm
620634
}
621635
assertThatThrownBy(() -> con.createChannel(false)).isInstanceOf(AmqpTimeoutException.class);
636+
int n = 0;
637+
if (physicalClose) {
638+
confirmsLatch.countDown();
639+
Channel channel2 = null;
640+
while (channel2 == null && n++ < 100) {
641+
try {
642+
channel2 = con.createChannel(false);
643+
}
644+
catch (Exception e) {
645+
Thread.sleep(100);
646+
}
647+
}
648+
assertThat(channel2).isNotNull();
649+
}
650+
else {
651+
confirmListener.get().handleAck(1L, false);
652+
boolean ok = false;
653+
while (!ok && n++ < 100) {
654+
try {
655+
rabbitTemplate.convertAndSend("foo", "bar");
656+
ok = true;
657+
}
658+
catch (Exception e) {
659+
Thread.sleep(100);
660+
}
661+
}
662+
assertThat(ok).isTrue();
663+
}
664+
exec.shutdownNow();
622665
}
623666

624667
@Test
@@ -1485,7 +1528,7 @@ public void testConsumerChannelWithPubConfPhysicallyClosedWhenNotIsOpen() throws
14851528
testConsumerChannelPhysicallyClosedWhenNotIsOpenGuts(true);
14861529
}
14871530

1488-
public void testConsumerChannelPhysicallyClosedWhenNotIsOpenGuts(boolean confirms) throws Exception {
1531+
private void testConsumerChannelPhysicallyClosedWhenNotIsOpenGuts(boolean confirms) throws Exception {
14891532
ExecutorService executor = Executors.newSingleThreadExecutor();
14901533
try {
14911534
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);

0 commit comments

Comments
 (0)