1
1
/*
2
- * Copyright 2002-2023 the original author or authors.
2
+ * Copyright 2002-2024 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
24
24
import java .util .LinkedHashSet ;
25
25
import java .util .List ;
26
26
import java .util .Set ;
27
+ import java .util .concurrent .locks .Lock ;
28
+ import java .util .concurrent .locks .ReentrantLock ;
27
29
28
30
import jakarta .jms .Connection ;
29
31
import jakarta .jms .ConnectionFactory ;
@@ -116,8 +118,8 @@ public class SingleConnectionFactory implements ConnectionFactory, QueueConnecti
116
118
/** Whether the shared Connection has been started. */
117
119
private int startedCount = 0 ;
118
120
119
- /** Synchronization monitor for the shared Connection. */
120
- private final Object connectionMonitor = new Object ();
121
+ /** Lifecycle lock for the shared Connection. */
122
+ private final Lock connectionLock = new ReentrantLock ();
121
123
122
124
123
125
/**
@@ -252,10 +254,14 @@ public Connection createConnection(String username, String password) throws JMSE
252
254
@ Override
253
255
public QueueConnection createQueueConnection () throws JMSException {
254
256
Connection con ;
255
- synchronized (this .connectionMonitor ) {
257
+ this .connectionLock .lock ();
258
+ try {
256
259
this .pubSubMode = Boolean .FALSE ;
257
260
con = createConnection ();
258
261
}
262
+ finally {
263
+ this .connectionLock .unlock ();
264
+ }
259
265
if (!(con instanceof QueueConnection queueConnection )) {
260
266
throw new jakarta .jms .IllegalStateException (
261
267
"This SingleConnectionFactory does not hold a QueueConnection but rather: " + con );
@@ -272,10 +278,14 @@ public QueueConnection createQueueConnection(String username, String password) t
272
278
@ Override
273
279
public TopicConnection createTopicConnection () throws JMSException {
274
280
Connection con ;
275
- synchronized (this .connectionMonitor ) {
281
+ this .connectionLock .lock ();
282
+ try {
276
283
this .pubSubMode = Boolean .TRUE ;
277
284
con = createConnection ();
278
285
}
286
+ finally {
287
+ this .connectionLock .unlock ();
288
+ }
279
289
if (!(con instanceof TopicConnection topicConnection )) {
280
290
throw new jakarta .jms .IllegalStateException (
281
291
"This SingleConnectionFactory does not hold a TopicConnection but rather: " + con );
@@ -323,12 +333,16 @@ private ConnectionFactory obtainTargetConnectionFactory() {
323
333
* @see #initConnection()
324
334
*/
325
335
protected Connection getConnection () throws JMSException {
326
- synchronized (this .connectionMonitor ) {
336
+ this .connectionLock .lock ();
337
+ try {
327
338
if (this .connection == null ) {
328
339
initConnection ();
329
340
}
330
341
return this .connection ;
331
342
}
343
+ finally {
344
+ this .connectionLock .unlock ();
345
+ }
332
346
}
333
347
334
348
/**
@@ -386,9 +400,13 @@ public void stop() {
386
400
*/
387
401
@ Override
388
402
public boolean isRunning () {
389
- synchronized (this .connectionMonitor ) {
403
+ this .connectionLock .lock ();
404
+ try {
390
405
return (this .connection != null );
391
406
}
407
+ finally {
408
+ this .connectionLock .unlock ();
409
+ }
392
410
}
393
411
394
412
@@ -404,7 +422,8 @@ public void initConnection() throws JMSException {
404
422
throw new IllegalStateException (
405
423
"'targetConnectionFactory' is required for lazily initializing a Connection" );
406
424
}
407
- synchronized (this .connectionMonitor ) {
425
+ this .connectionLock .lock ();
426
+ try {
408
427
if (this .connection != null ) {
409
428
closeConnection (this .connection );
410
429
}
@@ -433,6 +452,9 @@ public void initConnection() throws JMSException {
433
452
logger .debug ("Established shared JMS Connection: " + this .connection );
434
453
}
435
454
}
455
+ finally {
456
+ this .connectionLock .unlock ();
457
+ }
436
458
}
437
459
438
460
/**
@@ -531,12 +553,16 @@ else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection
531
553
* @see #closeConnection
532
554
*/
533
555
public void resetConnection () {
534
- synchronized (this .connectionMonitor ) {
556
+ this .connectionLock .lock ();
557
+ try {
535
558
if (this .connection != null ) {
536
559
closeConnection (this .connection );
537
560
}
538
561
this .connection = null ;
539
562
}
563
+ finally {
564
+ this .connectionLock .unlock ();
565
+ }
540
566
}
541
567
542
568
/**
@@ -634,7 +660,8 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
634
660
}
635
661
case "setExceptionListener" -> {
636
662
// Handle setExceptionListener method: add to the chain.
637
- synchronized (connectionMonitor ) {
663
+ connectionLock .lock ();
664
+ try {
638
665
if (aggregatedExceptionListener != null ) {
639
666
ExceptionListener listener = (ExceptionListener ) args [0 ];
640
667
if (listener != this .localExceptionListener ) {
@@ -656,16 +683,23 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
656
683
"which will allow for registering further ExceptionListeners to the recovery chain." );
657
684
}
658
685
}
686
+ finally {
687
+ connectionLock .unlock ();
688
+ }
659
689
}
660
690
case "getExceptionListener" -> {
661
- synchronized (connectionMonitor ) {
691
+ connectionLock .lock ();
692
+ try {
662
693
if (this .localExceptionListener != null ) {
663
694
return this .localExceptionListener ;
664
695
}
665
696
else {
666
697
return getExceptionListener ();
667
698
}
668
699
}
700
+ finally {
701
+ connectionLock .unlock ();
702
+ }
669
703
}
670
704
case "start" -> {
671
705
localStart ();
@@ -677,14 +711,18 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
677
711
}
678
712
case "close" -> {
679
713
localStop ();
680
- synchronized (connectionMonitor ) {
714
+ connectionLock .lock ();
715
+ try {
681
716
if (this .localExceptionListener != null ) {
682
717
if (aggregatedExceptionListener != null ) {
683
718
aggregatedExceptionListener .delegates .remove (this .localExceptionListener );
684
719
}
685
720
this .localExceptionListener = null ;
686
721
}
687
722
}
723
+ finally {
724
+ connectionLock .unlock ();
725
+ }
688
726
return null ;
689
727
}
690
728
case "createSession" , "createQueueSession" , "createTopicSession" -> {
@@ -727,7 +765,8 @@ else if (args.length == 2) {
727
765
}
728
766
729
767
private void localStart () throws JMSException {
730
- synchronized (connectionMonitor ) {
768
+ connectionLock .lock ();
769
+ try {
731
770
if (!this .locallyStarted ) {
732
771
this .locallyStarted = true ;
733
772
if (startedCount == 0 && connection != null ) {
@@ -736,10 +775,14 @@ private void localStart() throws JMSException {
736
775
startedCount ++;
737
776
}
738
777
}
778
+ finally {
779
+ connectionLock .unlock ();
780
+ }
739
781
}
740
782
741
783
private void localStop () throws JMSException {
742
- synchronized (connectionMonitor ) {
784
+ connectionLock .lock ();
785
+ try {
743
786
if (this .locallyStarted ) {
744
787
this .locallyStarted = false ;
745
788
if (startedCount == 1 && connection != null ) {
@@ -750,6 +793,9 @@ private void localStop() throws JMSException {
750
793
}
751
794
}
752
795
}
796
+ finally {
797
+ connectionLock .unlock ();
798
+ }
753
799
}
754
800
755
801
private SingleConnectionFactory factory () {
@@ -771,9 +817,13 @@ public void onException(JMSException ex) {
771
817
// Iterate over temporary copy in order to avoid ConcurrentModificationException,
772
818
// since listener invocations may in turn trigger registration of listeners...
773
819
Set <ExceptionListener > copy ;
774
- synchronized (connectionMonitor ) {
820
+ connectionLock .lock ();
821
+ try {
775
822
copy = new LinkedHashSet <>(this .delegates );
776
823
}
824
+ finally {
825
+ connectionLock .unlock ();
826
+ }
777
827
for (ExceptionListener listener : copy ) {
778
828
listener .onException (ex );
779
829
}
0 commit comments