Skip to content

Commit 4b78c20

Browse files
garyrussellartembilan
authored andcommitted
AMQP-788 Add delegate publisher connection factory
JIRA: https://jira.spring.io/browse/AMQP-788 To avoid deadlocks, it is best to use a different connection for producers and consumers (unless the producer is partiticipating in a consumer transaction). - Add a delegate `publisherConnectionFactory` to the `CachingConnecetionFactory`. - Use the same underlying `com.rabbitmq.client.ConnectionFactory` in each. - Propagate all properties to the delegate. - Except, enhance the connection name and bean name with `.publisher`. - Add a boolean `usePublisherConnection` to the `RabbitTemplate`. - If true, use `createPublisherConnection()` when appropriate. Polishing - PR Comments Polishing - More PR Comments and Fix doSendAndReceiveWithDirect() to use the publishing CF. More polishing; tests + reinstate overloaded execute(). Fix (old) race condition in testReceiveAndReplyNonBlocking. https://travis-ci.org/spring-projects/spring-amqp/builds/317052188?utm_source=github_status&utm_medium=notification Polishing Docs
1 parent c576b27 commit 4b78c20

File tree

11 files changed

+410
-38
lines changed

11 files changed

+410
-38
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware,
6464
ApplicationContextAware, ApplicationEventPublisherAware, ApplicationListener<ContextClosedEvent> {
6565

66+
private static final String PUBLISHER_SUFFIX = ".publisher";
67+
6668
public static final int DEFAULT_CLOSE_TIMEOUT = 30000;
6769

6870
private static final String BAD_URI = "setUri() was passed an invalid URI; it is ignored";
@@ -77,6 +79,8 @@ public abstract class AbstractConnectionFactory implements ConnectionFactory, Di
7779

7880
private final AtomicInteger defaultConnectionNameStrategyCounter = new AtomicInteger();
7981

82+
private AbstractConnectionFactory publisherConnectionFactory;
83+
8084
private RecoveryListener recoveryListener = new RecoveryListener() {
8185

8286
@Override
@@ -115,17 +119,26 @@ public void handleRecovery(Recoverable recoverable) {
115119
private volatile boolean contextStopped;
116120

117121
/**
118-
* Create a new AbstractConnectionFactory for the given target ConnectionFactory.
122+
* Create a new AbstractConnectionFactory for the given target ConnectionFactory,
123+
* with no publisher connection factory.
119124
* @param rabbitConnectionFactory the target ConnectionFactory
120125
*/
121126
public AbstractConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory) {
122127
Assert.notNull(rabbitConnectionFactory, "Target ConnectionFactory must not be null");
123128
this.rabbitConnectionFactory = rabbitConnectionFactory;
124129
}
125130

131+
protected final void setPublisherConnectionFactory(
132+
AbstractConnectionFactory publisherConnectionFactory) {
133+
this.publisherConnectionFactory = publisherConnectionFactory;
134+
}
135+
126136
@Override
127137
public void setApplicationContext(ApplicationContext applicationContext) {
128138
this.applicationContext = applicationContext;
139+
if (this.publisherConnectionFactory != null) {
140+
this.publisherConnectionFactory.setApplicationContext(applicationContext);
141+
}
129142
}
130143

131144
protected ApplicationContext getApplicationContext() {
@@ -135,6 +148,9 @@ protected ApplicationContext getApplicationContext() {
135148
@Override
136149
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
137150
this.applicationEventPublisher = applicationEventPublisher;
151+
if (this.publisherConnectionFactory != null) {
152+
this.publisherConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
153+
}
138154
}
139155

140156
protected ApplicationEventPublisher getApplicationEventPublisher() {
@@ -146,6 +162,9 @@ public void onApplicationEvent(ContextClosedEvent event) {
146162
if (getApplicationContext() == event.getApplicationContext()) {
147163
this.contextStopped = true;
148164
}
165+
if (this.publisherConnectionFactory != null) {
166+
this.publisherConnectionFactory.onApplicationEvent(event);
167+
}
149168
}
150169

151170
protected boolean getContextStopped() {
@@ -261,6 +280,9 @@ public void setAddresses(String addresses) {
261280
Address[] addressArray = Address.parseAddresses(addresses);
262281
if (addressArray.length > 0) {
263282
this.addresses = addressArray;
283+
if (this.publisherConnectionFactory != null) {
284+
this.publisherConnectionFactory.setAddresses(addresses);
285+
}
264286
return;
265287
}
266288
}
@@ -288,21 +310,34 @@ protected ChannelListener getChannelListener() {
288310

289311
public void setConnectionListeners(List<? extends ConnectionListener> listeners) {
290312
this.connectionListener.setDelegates(listeners);
313+
if (this.publisherConnectionFactory != null) {
314+
this.publisherConnectionFactory.setConnectionListeners(listeners);
315+
}
291316
}
292317

293318
@Override
294319
public void addConnectionListener(ConnectionListener listener) {
295320
this.connectionListener.addDelegate(listener);
321+
if (this.publisherConnectionFactory != null) {
322+
this.publisherConnectionFactory.addConnectionListener(listener);
323+
}
296324
}
297325

298326
@Override
299327
public boolean removeConnectionListener(ConnectionListener listener) {
300-
return this.connectionListener.removeDelegate(listener);
328+
boolean result = this.connectionListener.removeDelegate(listener);
329+
if (this.publisherConnectionFactory != null) {
330+
this.publisherConnectionFactory.removeConnectionListener(listener); // NOSONAR
331+
}
332+
return result;
301333
}
302334

303335
@Override
304336
public void clearConnectionListeners() {
305337
this.connectionListener.clearDelegates();
338+
if (this.publisherConnectionFactory != null) {
339+
this.publisherConnectionFactory.clearConnectionListeners();
340+
}
306341
}
307342

308343
public void setChannelListeners(List<? extends ChannelListener> listeners) {
@@ -316,10 +351,16 @@ public void setChannelListeners(List<? extends ChannelListener> listeners) {
316351
*/
317352
public void setRecoveryListener(RecoveryListener recoveryListener) {
318353
this.recoveryListener = recoveryListener;
354+
if (this.publisherConnectionFactory != null) {
355+
this.publisherConnectionFactory.setRecoveryListener(recoveryListener);
356+
}
319357
}
320358

321359
public void addChannelListener(ChannelListener listener) {
322360
this.channelListener.addDelegate(listener);
361+
if (this.publisherConnectionFactory != null) {
362+
this.publisherConnectionFactory.addChannelListener(listener);
363+
}
323364
}
324365

325366
/**
@@ -340,6 +381,9 @@ public void setExecutor(Executor executor) {
340381
else {
341382
this.executorService = ((ThreadPoolTaskExecutor) executor).getThreadPoolExecutor();
342383
}
384+
if (this.publisherConnectionFactory != null) {
385+
this.publisherConnectionFactory.setExecutor(executor);
386+
}
343387
}
344388

345389
protected ExecutorService getExecutorService() {
@@ -353,6 +397,9 @@ protected ExecutorService getExecutorService() {
353397
*/
354398
public void setCloseTimeout(int closeTimeout) {
355399
this.closeTimeout = closeTimeout;
400+
if (this.publisherConnectionFactory != null) {
401+
this.publisherConnectionFactory.setCloseTimeout(closeTimeout);
402+
}
356403
}
357404

358405
public int getCloseTimeout() {
@@ -367,11 +414,27 @@ public int getCloseTimeout() {
367414
*/
368415
public void setConnectionNameStrategy(ConnectionNameStrategy connectionNameStrategy) {
369416
this.connectionNameStrategy = connectionNameStrategy;
417+
if (this.publisherConnectionFactory != null) {
418+
this.publisherConnectionFactory.setConnectionNameStrategy(
419+
cf -> connectionNameStrategy.obtainNewConnectionName(cf) + PUBLISHER_SUFFIX);
420+
}
370421
}
371422

372423
@Override
373424
public void setBeanName(String name) {
374425
this.beanName = name;
426+
if (this.publisherConnectionFactory != null) {
427+
this.publisherConnectionFactory.setBeanName(name + PUBLISHER_SUFFIX);
428+
}
429+
}
430+
431+
public boolean hasPublisherConnectionFactory() {
432+
return this.publisherConnectionFactory != null;
433+
}
434+
435+
@Override
436+
public ConnectionFactory getPublisherConnectionFactory() {
437+
return this.publisherConnectionFactory;
375438
}
376439

377440
protected final Connection createBareConnection() {
@@ -430,6 +493,9 @@ protected final String getDefaultHostName() {
430493

431494
@Override
432495
public void destroy() {
496+
if (this.publisherConnectionFactory != null) {
497+
this.publisherConnectionFactory.destroy();
498+
}
433499
}
434500

435501
@Override

0 commit comments

Comments
 (0)