Skip to content

Commit f786c5b

Browse files
garyrussellartembilan
authored andcommitted
AMQP-796: Fix Admin Transaction
JIRA: https://jira.spring.io/browse/AMQP-796 If an admin uses a transactional `RabbitTemplate` it will start a transaction. If the connection was opened due to a `RabbitTemplate` operation it should participate in the same transaction. Previously, the template used a second channel and treated it as a local transaction. Also fix the `RabbitAdmin` so it does no work if there is nothing to declare. # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminDeclarationTests.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java * Remove `RabbitTemplateTests` changes since they are not related to the current state of the `RabbitAdmin`: the `RabbitTemplate`-based constructor has been introduced since version `2.0`
1 parent f78d916 commit f786c5b

File tree

5 files changed

+67
-13
lines changed

5 files changed

+67
-13
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -138,6 +138,18 @@ private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionF
138138
channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory);
139139
if (channel == null && connection == null) {
140140
connection = resourceFactory.createConnection();
141+
if (resourceHolder == null) {
142+
/*
143+
* While creating a connection, a connection listener might have created a
144+
* transactional channel and bound it to the transaction.
145+
*/
146+
resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
147+
.getResource(connectionFactory);
148+
if (resourceHolder != null) {
149+
channel = resourceHolder.getChannel();
150+
resourceHolderToUse = resourceHolder;
151+
}
152+
}
141153
resourceHolderToUse.addConnection(connection);
142154
}
143155
if (channel == null) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -497,6 +497,10 @@ else if (declarable instanceof Binding) {
497497
}
498498
}
499499

500+
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
501+
this.logger.debug("Nothing to declare");
502+
return;
503+
}
500504
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
501505
@Override
502506
public Object doInRabbit(Channel channel) throws Exception {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminDeclarationTests.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
import static org.junit.Assert.assertNotNull;
2222
import static org.junit.Assert.assertThat;
2323
import static org.junit.Assert.fail;
24+
import static org.mockito.BDDMockito.given;
25+
import static org.mockito.BDDMockito.willReturn;
2426
import static org.mockito.Matchers.any;
2527
import static org.mockito.Matchers.anyBoolean;
2628
import static org.mockito.Matchers.anyMap;
29+
import static org.mockito.Matchers.anyString;
2730
import static org.mockito.Matchers.eq;
2831
import static org.mockito.Matchers.isNull;
2932
import static org.mockito.Mockito.doAnswer;
@@ -55,6 +58,7 @@
5558
import org.springframework.amqp.rabbit.connection.Connection;
5659
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5760
import org.springframework.amqp.rabbit.connection.ConnectionListener;
61+
import org.springframework.context.ApplicationContext;
5862
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5963
import org.springframework.context.annotation.Bean;
6064
import org.springframework.context.annotation.Configuration;
@@ -115,11 +119,14 @@ public void testNoDeclareWithCachedConnections() throws Exception {
115119
final List<Channel> mockChannels = new ArrayList<Channel>();
116120

117121
doAnswer(new Answer<com.rabbitmq.client.Connection>() {
122+
118123
private int connectionNumber;
124+
119125
@Override
120126
public com.rabbitmq.client.Connection answer(InvocationOnMock invocation) throws Throwable {
121127
com.rabbitmq.client.Connection connection = mock(com.rabbitmq.client.Connection.class);
122128
doAnswer(new Answer<Channel>() {
129+
123130
private int channelNumber;
124131

125132
@Override
@@ -154,7 +161,7 @@ public Channel answer(InvocationOnMock invocation) throws Throwable {
154161
ccf.createConnection().close();
155162
ccf.destroy();
156163

157-
assertEquals("Admin should not have created a channel", 0, mockChannels.size());
164+
assertEquals("Admin should not have created a channel", 0, mockChannels.size());
158165
}
159166

160167
@Test
@@ -234,7 +241,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
234241

235242
verify(channel, never()).queueDeclare(eq("foo"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
236243
verify(channel, never())
237-
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
244+
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
238245
verify(channel, never()).queueBind(eq("foo"), eq("bar"), eq("foo"), any(Map.class));
239246
}
240247

@@ -275,7 +282,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
275282

276283
verify(channel, never()).queueDeclare(eq("foo"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
277284
verify(channel, never())
278-
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
285+
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
279286
verify(channel, never()).queueBind(eq("foo"), eq("bar"), eq("foo"), any(Map.class));
280287
}
281288

@@ -293,7 +300,7 @@ public void testJavaConfig() throws Exception {
293300
.queueDeclare(eq("foo"), anyBoolean(), anyBoolean(), anyBoolean(), isNull(Map.class));
294301
verify(Config.channel2, never())
295302
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(),
296-
anyBoolean(), anyMap());
303+
anyBoolean(), anyMap());
297304
verify(Config.channel2, never()).queueBind(eq("foo"), eq("bar"), eq("foo"), anyMap());
298305
context.close();
299306
}
@@ -308,7 +315,7 @@ public void testAddRemove() {
308315
assertEquals(2, queue.getDeclaringAdmins().size());
309316
queue.setAdminsThatShouldDeclare(admin1);
310317
assertEquals(1, queue.getDeclaringAdmins().size());
311-
queue.setAdminsThatShouldDeclare(new Object[] {null});
318+
queue.setAdminsThatShouldDeclare(new Object[] { null });
312319
assertEquals(0, queue.getDeclaringAdmins().size());
313320
queue.setAdminsThatShouldDeclare(admin1, admin2);
314321
assertEquals(2, queue.getDeclaringAdmins().size());
@@ -331,6 +338,26 @@ public void testAddRemove() {
331338
}
332339
}
333340

341+
@Test
342+
public void testNoOpWhenNothingToDeclare() throws Exception {
343+
com.rabbitmq.client.ConnectionFactory cf = mock(com.rabbitmq.client.ConnectionFactory.class);
344+
com.rabbitmq.client.Connection connection = mock(com.rabbitmq.client.Connection.class);
345+
Channel channel = mock(Channel.class, "channel1");
346+
given(channel.isOpen()).willReturn(true);
347+
willReturn(connection).given(cf).newConnection(any(ExecutorService.class), anyString());
348+
given(connection.isOpen()).willReturn(true);
349+
given(connection.createChannel()).willReturn(channel);
350+
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
351+
ccf.setExecutor(mock(ExecutorService.class));
352+
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
353+
RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());
354+
ApplicationContext ac = mock(ApplicationContext.class);
355+
admin.setApplicationContext(ac);
356+
admin.afterPropertiesSet();
357+
ccf.createConnection();
358+
verify(connection, never()).createChannel();
359+
}
360+
334361
@Configuration
335362
public static class Config {
336363

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
268268
@Override
269269
public void run() {
270270
templateWithConfirmsEnabled.execute(new ChannelCallback<Object>() {
271+
271272
@Override
272273
public Object doInRabbit(Channel channel) throws Exception {
273274
try {
@@ -325,6 +326,7 @@ public void testPublisherReturns() throws Exception {
325326
final CountDownLatch latch = new CountDownLatch(1);
326327
final List<Message> returns = new ArrayList<Message>();
327328
templateWithReturnsEnabled.setReturnCallback(new ReturnCallback() {
329+
328330
@Override
329331
public void returnedMessage(Message message, int replyCode,
330332
String replyText, String exchange, String routingKey) {
@@ -345,6 +347,7 @@ public void testPublisherReturnsWithMandatoryExpression() throws Exception {
345347
final CountDownLatch latch = new CountDownLatch(1);
346348
final List<Message> returns = new ArrayList<Message>();
347349
templateWithReturnsEnabled.setReturnCallback(new ReturnCallback() {
350+
348351
@Override
349352
public void returnedMessage(Message message, int replyCode,
350353
String replyText, String exchange, String routingKey) {
@@ -434,6 +437,7 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
434437
@Override
435438
public void run() {
436439
template.execute(new ChannelCallback<Object>() {
440+
437441
@Override
438442
public Object doInRabbit(Channel channel) throws Exception {
439443
try {
@@ -443,9 +447,9 @@ public Object doInRabbit(Channel channel) throws Exception {
443447
Thread.currentThread().interrupt();
444448
}
445449
template.doSend(channel, "", ROUTE,
446-
new SimpleMessageConverter().toMessage("message", new MessageProperties()),
447-
false,
448-
new CorrelationData("def"));
450+
new SimpleMessageConverter().toMessage("message", new MessageProperties()),
451+
false,
452+
new CorrelationData("def"));
449453
threadSentLatch.countDown();
450454
return null;
451455
}
@@ -488,10 +492,12 @@ public void testPublisherConfirmNotReceivedAged() throws Exception {
488492

489493
final AtomicInteger count = new AtomicInteger();
490494
doAnswer(new Answer<Object>() {
495+
491496
@Override
492497
public Object answer(InvocationOnMock invocation) throws Throwable {
493498
return count.incrementAndGet();
494-
} }).when(mockChannel).getNextPublishSeqNo();
499+
}
500+
}).when(mockChannel).getNextPublishSeqNo();
495501

496502
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
497503
ccf.setPublisherConfirms(true);
@@ -533,6 +539,7 @@ public void testPublisherConfirmMultiple() throws Exception {
533539

534540
final AtomicInteger count = new AtomicInteger();
535541
doAnswer(new Answer<Object>() {
542+
536543
@Override
537544
public Object answer(InvocationOnMock invocation) throws Throwable {
538545
return count.incrementAndGet();
@@ -580,6 +587,7 @@ public void testPublisherConfirmMultipleWithTwoListeners() throws Exception {
580587

581588
final AtomicInteger count = new AtomicInteger();
582589
doAnswer(new Answer<Object>() {
590+
583591
@Override
584592
public Object answer(InvocationOnMock invocation) throws Throwable {
585593
return count.incrementAndGet();
@@ -681,6 +689,7 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
681689
}
682690
});
683691
Executors.newSingleThreadExecutor().execute(new Runnable() {
692+
684693
@Override
685694
public void run() {
686695
template.convertAndSend(ROUTE, (Object) "message", new CorrelationData("abc"));
@@ -689,6 +698,7 @@ public void run() {
689698
}
690699
});
691700
Executors.newSingleThreadExecutor().execute(new Runnable() {
701+
692702
@Override
693703
public void run() {
694704
try {
@@ -921,7 +931,8 @@ public void run() {
921931
try {
922932
template.convertAndSend(ROUTE, (Object) "message", new CorrelationData("abc"));
923933
}
924-
catch (AmqpException e) { }
934+
catch (AmqpException e) {
935+
}
925936
}
926937
sentAll.countDown();
927938
}

0 commit comments

Comments
 (0)