Skip to content

Commit 7bf27de

Browse files
Will Drosteartembilan
Will Droste
authored andcommitted
AMQP-824: Name for deferredCloseExec thread pool
JIRA https://jira.spring.io/browse/AMQP-824 Taking the comments into account Fix build * Polishing for code style **Cherry-pick to 2.0.x & 2.1.x**
1 parent 922994f commit 7bf27de

File tree

3 files changed

+85
-29
lines changed

3 files changed

+85
-29
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
* @author Gary Russell
5252
* @author Steve Powell
5353
* @author Artem Bilan
54+
* @author Will Droste
5455
*
5556
*/
5657
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware {
@@ -343,6 +344,16 @@ public void setBeanName(String name) {
343344
this.beanName = name;
344345
}
345346

347+
/**
348+
* Return a bean name of the component or null if not a bean.
349+
* @return the bean name or null.
350+
* @since 1.7.9
351+
*/
352+
protected String getBeanName() {
353+
return this.beanName;
354+
}
355+
356+
346357
protected final Connection createBareConnection() {
347358
try {
348359
String connectionName = this.connectionNameStrategy == null ? null

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.Executors;
3737
import java.util.concurrent.LinkedBlockingDeque;
3838
import java.util.concurrent.Semaphore;
39+
import java.util.concurrent.ThreadFactory;
3940
import java.util.concurrent.TimeUnit;
4041
import java.util.concurrent.TimeoutException;
4142
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +57,7 @@
5657
import org.springframework.context.event.ContextClosedEvent;
5758
import org.springframework.jmx.export.annotation.ManagedAttribute;
5859
import org.springframework.jmx.export.annotation.ManagedResource;
60+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
5961
import org.springframework.util.Assert;
6062
import org.springframework.util.ObjectUtils;
6163
import org.springframework.util.StringUtils;
@@ -92,14 +94,22 @@
9294
* @author Gary Russell
9395
* @author Artem Bilan
9496
* @author Steve Powell
97+
* @author Will Droste
9598
*/
9699
@ManagedResource
97100
public class CachingConnectionFactory extends AbstractConnectionFactory
98101
implements InitializingBean, ShutdownListener, ApplicationContextAware, ApplicationListener<ContextClosedEvent>,
99-
PublisherCallbackChannelConnectionFactory {
102+
PublisherCallbackChannelConnectionFactory {
100103

101104
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
102105

106+
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
107+
108+
/**
109+
* Create a unique ID for the pool
110+
*/
111+
private static final AtomicInteger threadPoolId = new AtomicInteger();
112+
103113
private static final Set<String> txStarts = new HashSet<String>(Arrays.asList("basicPublish", "basicAck", "basicNack",
104114
"basicReject"));
105115

@@ -122,10 +132,10 @@ public enum CacheMode {
122132
new HashSet<ChannelCachingConnectionProxy>();
123133

124134
private final Map<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>
125-
allocatedConnectionNonTransactionalChannels = new HashMap<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>();
135+
allocatedConnectionNonTransactionalChannels = new HashMap<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>();
126136

127137
private final Map<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>
128-
allocatedConnectionTransactionalChannels = new HashMap<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>();
138+
allocatedConnectionTransactionalChannels = new HashMap<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>();
129139

130140
private final BlockingDeque<ChannelCachingConnectionProxy> idleConnections =
131141
new LinkedBlockingDeque<ChannelCachingConnectionProxy>();
@@ -166,12 +176,15 @@ public enum CacheMode {
166176

167177
private volatile ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();
168178

169-
/** Synchronization monitor for the shared Connection */
179+
/**
180+
* Synchronization monitor for the shared Connection
181+
*/
170182
private final Object connectionMonitor = new Object();
171183

172-
/** Executor used for deferred close if no explicit executor set. */
173-
private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool();
174-
184+
/**
185+
* Executor used for deferred close if no explicit executor set.
186+
*/
187+
private ExecutorService deferredCloseExecutor;
175188

176189
/**
177190
* Create a new CachingConnectionFactory initializing the hostname to be the value returned from
@@ -406,7 +419,7 @@ private Channel getChannel(ChannelCachingConnectionProxy connection, boolean tra
406419
}
407420
if (logger.isDebugEnabled()) {
408421
logger.debug(
409-
"Acquired permit for " + connection + ", remaining:" + checkoutPermits.availablePermits());
422+
"Acquired permit for " + connection + ", remaining:" + checkoutPermits.availablePermits());
410423
}
411424
}
412425
catch (InterruptedException e) {
@@ -486,7 +499,7 @@ private Channel getChannel(ChannelCachingConnectionProxy connection, boolean tra
486499
checkoutPermits.release();
487500
if (logger.isDebugEnabled()) {
488501
logger.debug("Could not get channel; released permit for " + connection + ", remaining:"
489-
+ checkoutPermits.availablePermits());
502+
+ checkoutPermits.availablePermits());
490503
}
491504
}
492505
throw e;
@@ -692,7 +705,9 @@ public final void destroy() {
692705
resetConnection();
693706
if (this.contextStopped) {
694707
this.stopped = true;
695-
this.deferredCloseExecutor.shutdownNow();
708+
if (this.deferredCloseExecutor != null) {
709+
this.deferredCloseExecutor.shutdownNow();
710+
}
696711
}
697712
}
698713

@@ -820,12 +835,12 @@ public Properties getCacheProperties() {
820835
props.setProperty("connectionCacheSize", Integer.toString(this.connectionCacheSize));
821836
props.setProperty("openConnections", Integer.toString(countOpenConnections()));
822837
props.setProperty("idleConnections", Integer.toString(this.idleConnections.size()));
823-
props.setProperty("idleConnectionsHighWater", Integer.toString(this.connectionHighWaterMark.get()));
838+
props.setProperty("idleConnectionsHighWater", Integer.toString(this.connectionHighWaterMark.get()));
824839
for (ChannelCachingConnectionProxy proxy : this.allocatedConnections) {
825840
putConnectionName(props, proxy, ":" + proxy.getLocalPort());
826841
}
827842
for (Entry<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>> entry :
828-
this.allocatedConnectionTransactionalChannels.entrySet()) {
843+
this.allocatedConnectionTransactionalChannels.entrySet()) {
829844
int port = entry.getKey().getLocalPort();
830845
if (port > 0 && entry.getKey().isOpen()) {
831846
LinkedList<ChannelProxy> channelList = entry.getValue();
@@ -835,7 +850,7 @@ public Properties getCacheProperties() {
835850
}
836851
}
837852
for (Entry<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>> entry :
838-
this.allocatedConnectionNonTransactionalChannels.entrySet()) {
853+
this.allocatedConnectionNonTransactionalChannels.entrySet()) {
839854
int port = entry.getKey().getLocalPort();
840855
if (port > 0 && entry.getKey().isOpen()) {
841856
LinkedList<ChannelProxy> channelList = entry.getValue();
@@ -880,6 +895,28 @@ private int countOpenConnections() {
880895
return n;
881896
}
882897

898+
/**
899+
* Determine the executor service used to close connections.
900+
* @return specified executor service otherwise the default one is created and returned.
901+
* @since 1.7.9
902+
*/
903+
protected ExecutorService getDeferredCloseExecutor() {
904+
if (getExecutorService() != null) {
905+
return getExecutorService();
906+
}
907+
synchronized (this.connectionMonitor) {
908+
if (this.deferredCloseExecutor == null) {
909+
final String threadPrefix =
910+
getBeanName() == null
911+
? DEFAULT_DEFERRED_POOL_PREFIX + threadPoolId.incrementAndGet()
912+
: getBeanName();
913+
ThreadFactory threadPoolFactory = new CustomizableThreadFactory(threadPrefix);
914+
this.deferredCloseExecutor = Executors.newCachedThreadPool(threadPoolFactory);
915+
}
916+
}
917+
return this.deferredCloseExecutor;
918+
}
919+
883920
@Override
884921
public String toString() {
885922
return "CachingConnectionFactory [channelCacheSize=" + this.channelCacheSize + ", host=" + getHost()
@@ -1033,7 +1070,7 @@ private void releasePermitIfNecessary(Object proxy) {
10331070
checkoutPermits.release();
10341071
if (logger.isDebugEnabled()) {
10351072
logger.debug("Released permit for '" + this.theConnection + "', remaining: "
1036-
+ checkoutPermits.availablePermits());
1073+
+ checkoutPermits.availablePermits());
10371074
}
10381075
}
10391076
else {
@@ -1097,9 +1134,7 @@ private void physicalClose() throws Exception {
10971134
if (CachingConnectionFactory.this.active &&
10981135
(CachingConnectionFactory.this.publisherConfirms ||
10991136
CachingConnectionFactory.this.publisherReturns)) {
1100-
ExecutorService executorService = (getExecutorService() != null
1101-
? getExecutorService()
1102-
: CachingConnectionFactory.this.deferredCloseExecutor);
1137+
ExecutorService executorService = getDeferredCloseExecutor();
11031138
final Channel channel = CachedChannelInvocationHandler.this.target;
11041139
executorService.execute(new Runnable() {
11051140

@@ -1116,14 +1151,18 @@ public void run() {
11161151
catch (InterruptedException e) {
11171152
Thread.currentThread().interrupt();
11181153
}
1119-
catch (Exception e) { }
1154+
catch (Exception e) {
1155+
}
11201156
finally {
11211157
try {
11221158
channel.close();
11231159
}
1124-
catch (IOException e) { }
1125-
catch (AlreadyClosedException e) { }
1126-
catch (TimeoutException e) { }
1160+
catch (IOException e) {
1161+
}
1162+
catch (AlreadyClosedException e) {
1163+
}
1164+
catch (TimeoutException e) {
1165+
}
11271166
catch (ShutdownSignalException e) {
11281167
if (!RabbitUtils.isNormalShutdown(e)) {
11291168
logger.debug("Unexpected exception on deferred close", e);
@@ -1253,8 +1292,8 @@ public int getLocalPort() {
12531292
@Override
12541293
public String toString() {
12551294
return "Proxy@" + ObjectUtils.getIdentityHexString(this) + " "
1256-
+ (CachingConnectionFactory.this.cacheMode == CacheMode.CHANNEL ? "Shared " : "Dedicated ")
1257-
+ "Rabbit Connection: " + this.target;
1295+
+ (CachingConnectionFactory.this.cacheMode == CacheMode.CHANNEL ? "Shared " : "Dedicated ")
1296+
+ "Rabbit Connection: " + this.target;
12581297
}
12591298

12601299
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerPlaceholderParserTests.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2016 the original author or authors.
2+
* Copyright 2010-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertTrue;
2121

2222
import java.util.Arrays;
23+
import java.util.concurrent.ExecutorService;
2324
import java.util.concurrent.ThreadPoolExecutor;
2425

2526
import org.junit.After;
@@ -42,6 +43,7 @@
4243
/**
4344
* @author Dave Syer
4445
* @author Gary Russell
46+
* @author Will Droste
4547
*/
4648
public final class ListenerContainerPlaceholderParserTests {
4749

@@ -58,14 +60,18 @@ public void closeBeanFactory() throws Exception {
5860
if (this.context != null) {
5961
CachingConnectionFactory cf = this.context.getBean(CachingConnectionFactory.class);
6062
this.context.close();
61-
assertTrue(TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class)
62-
.isTerminated());
63+
ExecutorService es = TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class);
64+
if (es != null) {
65+
// if it gets started make sure its terminated..
66+
assertTrue(es.isTerminated());
67+
}
6368
}
6469
}
6570

6671
@Test
6772
public void testParseWithQueueNames() throws Exception {
68-
SimpleMessageListenerContainer container = this.context.getBean("testListener", SimpleMessageListenerContainer.class);
73+
SimpleMessageListenerContainer container =
74+
this.context.getBean("testListener", SimpleMessageListenerContainer.class);
6975
assertEquals(AcknowledgeMode.MANUAL, container.getAcknowledgeMode());
7076
assertEquals(this.context.getBean(ConnectionFactory.class), container.getConnectionFactory());
7177
assertEquals(MessageListenerAdapter.class, container.getMessageListener().getClass());

0 commit comments

Comments
 (0)