Skip to content

Commit b896c89

Browse files
Will Drosteartembilan
Will Droste
authored andcommitted
AMQP-824: Name for deferredCloseExec thread pool
JIRA https://jira.spring.io/browse/AMQP-824 Taking the comments into account Fix build * Polishing for code style **Cherry-pick to 2.0.x & 2.1.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java
1 parent bac9aa2 commit b896c89

File tree

3 files changed

+60
-13
lines changed

3 files changed

+60
-13
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -58,6 +58,7 @@
5858
* @author Gary Russell
5959
* @author Steve Powell
6060
* @author Artem Bilan
61+
* @author Will Droste
6162
*
6263
*/
6364
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware,
@@ -428,7 +429,14 @@ public void setBeanName(String name) {
428429
}
429430
}
430431

431-
public boolean hasPublisherConnectionFactory() {
432+
/**
433+
* Return a bean name of the component or null if not a bean.
434+
* @return the bean name or null.
435+
* @since 1.7.9
436+
*/
437+
protected String getBeanName() {
438+
return this.beanName;
439+
} public boolean hasPublisherConnectionFactory() {
432440
return this.publisherConnectionFactory != null;
433441
}
434442

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.Executors;
4040
import java.util.concurrent.LinkedBlockingDeque;
4141
import java.util.concurrent.Semaphore;
42+
import java.util.concurrent.ThreadFactory;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.concurrent.TimeoutException;
4445
import java.util.concurrent.atomic.AtomicBoolean;
@@ -55,6 +56,7 @@
5556
import org.springframework.beans.factory.InitializingBean;
5657
import org.springframework.jmx.export.annotation.ManagedAttribute;
5758
import org.springframework.jmx.export.annotation.ManagedResource;
59+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
5860
import org.springframework.util.Assert;
5961
import org.springframework.util.ObjectUtils;
6062
import org.springframework.util.StringUtils;
@@ -92,13 +94,21 @@
9294
* @author Gary Russell
9395
* @author Artem Bilan
9496
* @author Steve Powell
97+
* @author Will Droste
9598
*/
9699
@ManagedResource
97100
public class CachingConnectionFactory extends AbstractConnectionFactory
98101
implements InitializingBean, ShutdownListener {
99102

100103
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
101104

105+
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
106+
107+
/**
108+
* Create a unique ID for the pool.
109+
*/
110+
private static final AtomicInteger threadPoolId = new AtomicInteger();
111+
102112
private static final Set<String> txStarts = new HashSet<>(Arrays.asList("basicPublish", "basicAck",
103113
"basicNack", "basicReject"));
104114

@@ -148,9 +158,6 @@ public enum CacheMode {
148158
/** Synchronization monitor for the shared Connection. */
149159
private final Object connectionMonitor = new Object();
150160

151-
/** Executor used for deferred close if no explicit executor set. */
152-
private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool();
153-
154161
private long channelCheckoutTimeout = 0;
155162

156163
private CacheMode cacheMode = CacheMode.CHANNEL;
@@ -172,6 +179,10 @@ public enum CacheMode {
172179
private volatile boolean active = true;
173180

174181
private volatile boolean initialized;
182+
/**
183+
* Executor used for deferred close if no explicit executor set.
184+
*/
185+
private ExecutorService deferredCloseExecutor;
175186

176187
private volatile boolean stopped;
177188

@@ -764,7 +775,9 @@ public final void destroy() {
764775
resetConnection();
765776
if (getContextStopped()) {
766777
this.stopped = true;
767-
this.deferredCloseExecutor.shutdownNow();
778+
if (this.deferredCloseExecutor != null) {
779+
this.deferredCloseExecutor.shutdownNow();
780+
}
768781
}
769782
}
770783

@@ -910,6 +923,28 @@ private int countOpenConnections() {
910923
return n;
911924
}
912925

926+
/**
927+
* Determine the executor service used to close connections.
928+
* @return specified executor service otherwise the default one is created and returned.
929+
* @since 1.7.9
930+
*/
931+
protected ExecutorService getDeferredCloseExecutor() {
932+
if (getExecutorService() != null) {
933+
return getExecutorService();
934+
}
935+
synchronized (this.connectionMonitor) {
936+
if (this.deferredCloseExecutor == null) {
937+
final String threadPrefix =
938+
getBeanName() == null
939+
? DEFAULT_DEFERRED_POOL_PREFIX + threadPoolId.incrementAndGet()
940+
: getBeanName();
941+
ThreadFactory threadPoolFactory = new CustomizableThreadFactory(threadPrefix);
942+
this.deferredCloseExecutor = Executors.newCachedThreadPool(threadPoolFactory);
943+
}
944+
}
945+
return this.deferredCloseExecutor;
946+
}
947+
913948
@Override
914949
public String toString() {
915950
return "CachingConnectionFactory [channelCacheSize=" + this.channelCacheSize + ", host=" + getHost()
@@ -1187,9 +1222,7 @@ private void physicalClose() throws Exception {
11871222
}
11881223

11891224
private void asyncClose() {
1190-
ExecutorService executorService = (getExecutorService() != null
1191-
? getExecutorService()
1192-
: CachingConnectionFactory.this.deferredCloseExecutor);
1225+
ExecutorService executorService = getDeferredCloseExecutor();
11931226
final Channel channel = CachedChannelInvocationHandler.this.target;
11941227
executorService.execute(() -> {
11951228
try {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerPlaceholderParserTests.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2016 the original author or authors.
2+
* Copyright 2010-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertTrue;
2121

2222
import java.util.Arrays;
23+
import java.util.concurrent.ExecutorService;
2324
import java.util.concurrent.ThreadPoolExecutor;
2425

2526
import org.junit.After;
@@ -42,6 +43,7 @@
4243
/**
4344
* @author Dave Syer
4445
* @author Gary Russell
46+
* @author Will Droste
4547
*/
4648
public final class ListenerContainerPlaceholderParserTests {
4749

@@ -58,14 +60,18 @@ public void closeBeanFactory() throws Exception {
5860
if (this.context != null) {
5961
CachingConnectionFactory cf = this.context.getBean(CachingConnectionFactory.class);
6062
this.context.close();
61-
assertTrue(TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class)
62-
.isTerminated());
63+
ExecutorService es = TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class);
64+
if (es != null) {
65+
// if it gets started make sure its terminated..
66+
assertTrue(es.isTerminated());
67+
}
6368
}
6469
}
6570

6671
@Test
6772
public void testParseWithQueueNames() throws Exception {
68-
SimpleMessageListenerContainer container = this.context.getBean("testListener", SimpleMessageListenerContainer.class);
73+
SimpleMessageListenerContainer container =
74+
this.context.getBean("testListener", SimpleMessageListenerContainer.class);
6975
assertEquals(AcknowledgeMode.MANUAL, container.getAcknowledgeMode());
7076
assertEquals(this.context.getBean(ConnectionFactory.class), container.getConnectionFactory());
7177
assertEquals(MessageListenerAdapter.class, container.getMessageListener().getClass());

0 commit comments

Comments
 (0)