Skip to content

Commit c38ed96

Browse files
authored
GH-8643: Replace synchronized with Lock
Fixes #8643 * First pass - trivial synchronized blocks - Convert the "trivial" `synchronized` block into `ReentrantLock`. * fix checkstyle * use blocking lock * Secon pass - handle multi-lock cases * javadoc + year * addres first batch of review suggestions * fix checkstyle issues * fix the mqtt parent/child lock monitor sharing * fix the mqtt parent/child lock monitor sharing, v2 * patch the stomp test
1 parent 9e9bfd0 commit c38ed96

File tree

101 files changed

+2643
-1249
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+2643
-1249
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,8 @@
2323
import java.util.UUID;
2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.ScheduledFuture;
26+
import java.util.concurrent.locks.Lock;
27+
import java.util.concurrent.locks.ReentrantLock;
2628

2729
import org.springframework.amqp.core.MessageDeliveryMode;
2830
import org.springframework.amqp.core.ReturnedMessage;
@@ -58,6 +60,7 @@
5860
*
5961
* @author Gary Russell
6062
* @author Artem Bilan
63+
* @author Christian Tzolov
6164
*
6265
* @since 4.3
6366
*
@@ -115,6 +118,8 @@ public abstract class AbstractAmqpOutboundEndpoint extends AbstractReplyProducin
115118

116119
private volatile ScheduledFuture<?> confirmChecker;
117120

121+
private final Lock lock = new ReentrantLock();
122+
118123
/**
119124
* Set a custom {@link AmqpHeaderMapper} for mapping request and reply headers.
120125
* Defaults to {@link DefaultAmqpHeaderMapper#outboundMapper()}.
@@ -336,8 +341,14 @@ public void setConfirmTimeout(long confirmTimeout) {
336341
this.confirmTimeout = Duration.ofMillis(confirmTimeout); // NOSONAR sync inconsistency
337342
}
338343

339-
protected final synchronized void setConnectionFactory(ConnectionFactory connectionFactory) {
340-
this.connectionFactory = connectionFactory;
344+
protected final void setConnectionFactory(ConnectionFactory connectionFactory) {
345+
this.lock.lock();
346+
try {
347+
this.connectionFactory = connectionFactory;
348+
}
349+
finally {
350+
this.lock.unlock();
351+
}
341352
}
342353

343354
protected String getExchangeName() {
@@ -487,26 +498,33 @@ protected void endpointInit() {
487498
}
488499

489500
@Override
490-
public synchronized void start() {
491-
if (!this.running) {
492-
if (!this.lazyConnect && this.connectionFactory != null) {
493-
try {
494-
Connection connection = this.connectionFactory.createConnection(); // NOSONAR (close)
495-
if (connection != null) {
496-
connection.close();
501+
public void start() {
502+
this.lock.lock();
503+
try {
504+
if (!this.running) {
505+
if (!this.lazyConnect && this.connectionFactory != null) {
506+
try {
507+
Connection connection = this.connectionFactory.createConnection(); // NOSONAR (close)
508+
if (connection != null) {
509+
connection.close();
510+
}
511+
}
512+
catch (RuntimeException ex) {
513+
logger.error(ex, "Failed to eagerly establish the connection.");
497514
}
498515
}
499-
catch (RuntimeException ex) {
500-
logger.error(ex, "Failed to eagerly establish the connection.");
516+
doStart();
517+
if (this.confirmTimeout != null && getConfirmNackChannel() != null && getRabbitTemplate() != null) {
518+
this.confirmChecker = getTaskScheduler()
519+
.scheduleAtFixedRate(checkUnconfirmed(), this.confirmTimeout.dividedBy(2L));
501520
}
521+
this.running = true;
502522
}
503-
doStart();
504-
if (this.confirmTimeout != null && getConfirmNackChannel() != null && getRabbitTemplate() != null) {
505-
this.confirmChecker = getTaskScheduler()
506-
.scheduleAtFixedRate(checkUnconfirmed(), this.confirmTimeout.dividedBy(2L));
507-
}
508-
this.running = true;
509523
}
524+
finally {
525+
this.lock.unlock();
526+
}
527+
510528
}
511529

512530
private Runnable checkUnconfirmed() {
@@ -526,14 +544,20 @@ private Runnable checkUnconfirmed() {
526544
protected abstract RabbitTemplate getRabbitTemplate();
527545

528546
@Override
529-
public synchronized void stop() {
530-
if (this.running) {
531-
doStop();
532-
}
533-
this.running = false;
534-
if (this.confirmChecker != null) {
535-
this.confirmChecker.cancel(false);
536-
this.confirmChecker = null;
547+
public void stop() {
548+
this.lock.lock();
549+
try {
550+
if (this.running) {
551+
doStop();
552+
}
553+
this.running = false;
554+
if (this.confirmChecker != null) {
555+
this.confirmChecker.cancel(false);
556+
this.confirmChecker = null;
557+
}
558+
}
559+
finally {
560+
this.lock.unlock();
537561
}
538562
}
539563

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.Set;
2727
import java.util.concurrent.ConcurrentHashMap;
2828
import java.util.concurrent.CopyOnWriteArrayList;
29+
import java.util.concurrent.locks.Lock;
30+
import java.util.concurrent.locks.ReentrantLock;
2931

3032
import io.micrometer.observation.ObservationRegistry;
3133

@@ -70,6 +72,7 @@
7072
* @author Oleg Zhurakousky
7173
* @author Gary Russell
7274
* @author Artem Bilan
75+
* @author Christian Tzolov
7376
*/
7477
@IntegrationManagedResource
7578
public abstract class AbstractMessageChannel extends IntegrationObjectSupport
@@ -475,6 +478,8 @@ public void destroy() {
475478
*/
476479
protected static class ChannelInterceptorList {
477480

481+
private final Lock lock = new ReentrantLock();
482+
478483
protected final List<ChannelInterceptor> interceptors = new CopyOnWriteArrayList<>(); // NOSONAR
479484

480485
private final LogAccessor logger;
@@ -486,11 +491,15 @@ public ChannelInterceptorList(LogAccessor logger) {
486491
}
487492

488493
public boolean set(List<ChannelInterceptor> interceptors) {
489-
synchronized (this.interceptors) {
494+
this.lock.lock();
495+
try {
490496
this.interceptors.clear();
491497
this.size = interceptors.size();
492498
return this.interceptors.addAll(interceptors);
493499
}
500+
finally {
501+
this.lock.unlock();
502+
}
494503
}
495504

496505
public int getSize() {

spring-integration-core/src/main/java/org/springframework/integration/channel/DefaultHeaderChannelRegistry.java

Lines changed: 63 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2022 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.ScheduledFuture;
2626
import java.util.concurrent.atomic.AtomicLong;
27+
import java.util.concurrent.locks.Lock;
28+
import java.util.concurrent.locks.ReentrantLock;
2729

2830
import org.springframework.integration.context.IntegrationObjectSupport;
2931
import org.springframework.integration.support.channel.HeaderChannelRegistry;
@@ -44,6 +46,7 @@
4446
* @author Gary Russell
4547
* @author Artem Bilan
4648
* @author Trung Pham
49+
* @author Christian Tzolov
4750
*
4851
* @since 3.0
4952
*
@@ -69,6 +72,8 @@ public class DefaultHeaderChannelRegistry extends IntegrationObjectSupport
6972

7073
private volatile boolean explicitlyStopped;
7174

75+
private final Lock lock = new ReentrantLock();
76+
7277
/**
7378
* Construct a registry with the default delay for channel expiry.
7479
*/
@@ -120,25 +125,37 @@ protected void onInit() {
120125
}
121126

122127
@Override
123-
public synchronized void start() {
124-
if (!this.running) {
125-
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
126-
this.reaperScheduledFuture =
127-
getTaskScheduler()
128-
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
129-
130-
this.running = true;
128+
public void start() {
129+
this.lock.lock();
130+
try {
131+
if (!this.running) {
132+
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
133+
this.reaperScheduledFuture = getTaskScheduler()
134+
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
135+
136+
this.running = true;
137+
}
138+
}
139+
finally {
140+
this.lock.unlock();
131141
}
132142
}
133143

134144
@Override
135-
public synchronized void stop() {
136-
this.running = false;
137-
if (this.reaperScheduledFuture != null) {
138-
this.reaperScheduledFuture.cancel(true);
139-
this.reaperScheduledFuture = null;
145+
public void stop() {
146+
this.lock.lock();
147+
try {
148+
this.running = false;
149+
if (this.reaperScheduledFuture != null) {
150+
this.reaperScheduledFuture.cancel(true);
151+
this.reaperScheduledFuture = null;
152+
}
153+
this.explicitlyStopped = true;
154+
}
155+
finally {
156+
this.lock.unlock();
140157
}
141-
this.explicitlyStopped = true;
158+
142159
}
143160

144161
public void stop(Runnable callback) {
@@ -200,35 +217,45 @@ public MessageChannel channelNameToChannel(@Nullable String name) {
200217
* Cancel the scheduled reap task and run immediately; then reschedule.
201218
*/
202219
@Override
203-
public synchronized void runReaper() {
204-
if (this.reaperScheduledFuture != null) {
205-
this.reaperScheduledFuture.cancel(true);
206-
this.reaperScheduledFuture = null;
207-
}
220+
public void runReaper() {
221+
this.lock.lock();
222+
try {
223+
if (this.reaperScheduledFuture != null) {
224+
this.reaperScheduledFuture.cancel(true);
225+
this.reaperScheduledFuture = null;
226+
}
208227

209-
run();
228+
run();
229+
}
230+
finally {
231+
this.lock.unlock();
232+
}
210233
}
211234

212235
@Override
213-
public synchronized void run() {
214-
logger.trace(() -> "Reaper started; channels size=" + this.channels.size());
215-
Iterator<Entry<String, MessageChannelWrapper>> iterator = this.channels.entrySet().iterator();
216-
long now = System.currentTimeMillis();
217-
while (iterator.hasNext()) {
218-
Entry<String, MessageChannelWrapper> entry = iterator.next();
219-
if (entry.getValue().expireAt() < now) {
220-
logger.debug(() -> "Expiring " + entry.getKey() + " (" + entry.getValue().channel() + ")");
221-
iterator.remove();
236+
public void run() {
237+
this.lock.lock();
238+
try {
239+
logger.trace(() -> "Reaper started; channels size=" + this.channels.size());
240+
Iterator<Entry<String, MessageChannelWrapper>> iterator = this.channels.entrySet().iterator();
241+
long now = System.currentTimeMillis();
242+
while (iterator.hasNext()) {
243+
Entry<String, MessageChannelWrapper> entry = iterator.next();
244+
if (entry.getValue().expireAt() < now) {
245+
logger.debug(() -> "Expiring " + entry.getKey() + " (" + entry.getValue().channel() + ")");
246+
iterator.remove();
247+
}
222248
}
223-
}
224-
this.reaperScheduledFuture =
225-
getTaskScheduler()
226-
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
249+
this.reaperScheduledFuture = getTaskScheduler()
250+
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
227251

228-
logger.trace(() -> "Reaper completed; channels size=" + this.channels.size());
252+
logger.trace(() -> "Reaper completed; channels size=" + this.channels.size());
253+
}
254+
finally {
255+
this.lock.unlock();
256+
}
229257
}
230258

231-
232259
protected record MessageChannelWrapper(MessageChannel channel, long expireAt) {
233260

234261
}

spring-integration-core/src/main/java/org/springframework/integration/config/AbstractSimpleMessageHandlerFactoryBean.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@
1717
package org.springframework.integration.config;
1818

1919
import java.util.List;
20+
import java.util.concurrent.locks.Lock;
21+
import java.util.concurrent.locks.ReentrantLock;
2022

2123
import org.aopalliance.aop.Advice;
2224
import org.apache.commons.logging.Log;
@@ -58,14 +60,15 @@
5860
* @author Gary Russell
5961
* @author Artem Bilan
6062
* @author David Liu
63+
* @author Christian Tzolov
6164
*/
6265
public abstract class AbstractSimpleMessageHandlerFactoryBean<H extends MessageHandler>
6366
implements FactoryBean<MessageHandler>, ApplicationContextAware, BeanFactoryAware, BeanNameAware,
6467
ApplicationEventPublisherAware {
6568

6669
protected final Log logger = LogFactory.getLog(getClass()); //NOSONAR protected with final
6770

68-
private final Object initializationMonitor = new Object();
71+
private final Lock initializationMonitor = new ReentrantLock();
6972

7073
private BeanFactory beanFactory;
7174

@@ -192,7 +195,8 @@ public H getObject() {
192195
}
193196

194197
protected final H createHandlerInternal() {
195-
synchronized (this.initializationMonitor) {
198+
this.initializationMonitor.lock();
199+
try {
196200
if (this.initialized) {
197201
// There was a problem when this method was called already
198202
return null;
@@ -228,6 +232,9 @@ protected final H createHandlerInternal() {
228232
this.order, theOrder -> ((Orderable) this.handler).setOrder(theOrder));
229233
this.initialized = true;
230234
}
235+
finally {
236+
this.initializationMonitor.unlock();
237+
}
231238
initializingBean();
232239
return this.handler;
233240
}

0 commit comments

Comments
 (0)