19
19
import java .io .IOException ;
20
20
import java .util .ArrayList ;
21
21
import java .util .Arrays ;
22
+ import java .util .Collection ;
22
23
import java .util .Collections ;
23
24
import java .util .HashMap ;
24
25
import java .util .HashSet ;
30
31
import java .util .Set ;
31
32
import java .util .concurrent .BlockingQueue ;
32
33
import java .util .concurrent .ConcurrentHashMap ;
34
+ import java .util .concurrent .ConcurrentMap ;
33
35
import java .util .concurrent .LinkedBlockingQueue ;
34
36
import java .util .concurrent .TimeUnit ;
35
37
import java .util .concurrent .TimeoutException ;
36
38
import java .util .concurrent .atomic .AtomicBoolean ;
39
+ import java .util .stream .Collectors ;
37
40
38
41
import org .apache .commons .logging .Log ;
39
42
import org .apache .commons .logging .LogFactory ;
63
66
import com .rabbitmq .client .AMQP ;
64
67
import com .rabbitmq .client .AlreadyClosedException ;
65
68
import com .rabbitmq .client .Channel ;
66
- import com .rabbitmq .client .Consumer ;
67
69
import com .rabbitmq .client .DefaultConsumer ;
68
70
import com .rabbitmq .client .Envelope ;
69
71
import com .rabbitmq .client .Recoverable ;
@@ -103,7 +105,7 @@ public class BlockingQueueConsumer implements RecoveryListener {
103
105
104
106
private RabbitResourceHolder resourceHolder ;
105
107
106
- private InternalConsumer consumer ;
108
+ private final ConcurrentMap < String , InternalConsumer > consumers = new ConcurrentHashMap <>() ;
107
109
108
110
/**
109
111
* The flag indicating that consumer has been cancelled from all queues
@@ -129,8 +131,6 @@ public class BlockingQueueConsumer implements RecoveryListener {
129
131
130
132
private final boolean defaultRequeueRejected ;
131
133
132
- private final Map <String , String > consumerTags = new ConcurrentHashMap <String , String >();
133
-
134
134
private final Set <String > missingQueues = Collections .synchronizedSet (new HashSet <String >());
135
135
136
136
private long retryDeclarationInterval = 60000 ;
@@ -290,8 +290,11 @@ public Channel getChannel() {
290
290
return this .channel ;
291
291
}
292
292
293
- public String getConsumerTag () {
294
- return this .consumer .getConsumerTag ();
293
+ public Collection <String > getConsumerTags () {
294
+ return this .consumers .values ().stream ()
295
+ .map (c -> c .getConsumerTag ())
296
+ .filter (tag -> tag != null )
297
+ .collect (Collectors .toList ());
295
298
}
296
299
297
300
public void setShutdownTimeout (long shutdownTimeout ) {
@@ -394,8 +397,7 @@ protected void basicCancel() {
394
397
395
398
protected void basicCancel (boolean expected ) {
396
399
this .normalCancel = expected ;
397
- for (String consumerTag : this .consumerTags .keySet ()) {
398
- removeConsumer (consumerTag );
400
+ getConsumerTags ().forEach (consumerTag -> {
399
401
try {
400
402
if (this .channel .isOpen ()) {
401
403
this .channel .basicCancel (consumerTag );
@@ -411,7 +413,8 @@ protected void basicCancel(boolean expected) {
411
413
logger .trace (this .channel + " is already closed" );
412
414
}
413
415
}
414
- }
416
+ });
417
+ this .cancelled .set (true );
415
418
this .abortStarted = System .currentTimeMillis ();
416
419
}
417
420
@@ -455,7 +458,7 @@ private Message handle(Delivery delivery) throws InterruptedException {
455
458
MessageProperties messageProperties = this .messagePropertiesConverter .toMessageProperties (
456
459
delivery .getProperties (), envelope , "UTF-8" );
457
460
messageProperties .setConsumerTag (delivery .getConsumerTag ());
458
- messageProperties .setConsumerQueue (this . consumerTags . get ( delivery .getConsumerTag () ));
461
+ messageProperties .setConsumerQueue (delivery .getQueue ( ));
459
462
Message message = new Message (body , messageProperties );
460
463
if (logger .isDebugEnabled ()) {
461
464
logger .debug ("Received message: " + message );
@@ -573,7 +576,6 @@ public void start() throws AmqpException {
573
576
catch (AmqpAuthenticationException e ) {
574
577
throw new FatalListenerStartupException ("Authentication failure" , e );
575
578
}
576
- this .consumer = new InternalConsumer (this .channel );
577
579
this .deliveryTags .clear ();
578
580
this .activeObjectCounter .add (this );
579
581
@@ -666,13 +668,14 @@ private void addRecoveryListener() {
666
668
}
667
669
668
670
private void consumeFromQueue (String queue ) throws IOException {
671
+ InternalConsumer consumer = new InternalConsumer (this .channel , queue );
669
672
String consumerTag = this .channel .basicConsume (queue , this .acknowledgeMode .isAutoAck (),
670
673
(this .tagStrategy != null ? this .tagStrategy .createConsumerTag (queue ) : "" ), this .noLocal ,
671
674
this .exclusive , this .consumerArgs ,
672
- new ConsumerDecorator ( queue , this . consumer , this . applicationEventPublisher ) );
675
+ consumer );
673
676
674
677
if (consumerTag != null ) {
675
- this .consumerTags .put (consumerTag , queue );
678
+ this .consumers .put (queue , consumer );
676
679
if (logger .isDebugEnabled ()) {
677
680
logger .debug ("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this );
678
681
}
@@ -718,15 +721,13 @@ private void attemptPassiveDeclarations() {
718
721
}
719
722
}
720
723
721
- public void stop () {
724
+ public synchronized void stop () {
722
725
if (this .abortStarted == 0 ) { // signal handle delivery to use offer
723
726
this .abortStarted = System .currentTimeMillis ();
724
727
}
725
- if (this .consumer != null && this .consumer .getChannel () != null && this .consumerTags .size () > 0
726
- && !this .cancelled .get ()) {
728
+ if (!this .cancelled ()) {
727
729
try {
728
- RabbitUtils .closeMessageConsumer (this .consumer .getChannel (), this .consumerTags .keySet (),
729
- this .transactional );
730
+ RabbitUtils .closeMessageConsumer (this .channel , getConsumerTags (), this .transactional );
730
731
}
731
732
catch (Exception e ) {
732
733
if (logger .isDebugEnabled ()) {
@@ -740,7 +741,7 @@ public void stop() {
740
741
RabbitUtils .setPhysicalCloseRequired (this .channel , true );
741
742
ConnectionFactoryUtils .releaseResources (this .resourceHolder );
742
743
this .deliveryTags .clear ();
743
- this .consumer = null ;
744
+ this .consumers . clear () ;
744
745
this .queue .clear (); // in case we still have a client thread blocked
745
746
}
746
747
@@ -780,22 +781,6 @@ public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {
780
781
}
781
782
}
782
783
783
- /**
784
- * Remove the consumer and set cancelled if all are gone.
785
- * @param consumerTag the tag to remove.
786
- * @return true if consumers remain.
787
- */
788
- private boolean removeConsumer (String consumerTag ) {
789
- this .consumerTags .remove (consumerTag );
790
- if (this .consumerTags .isEmpty ()) {
791
- this .cancelled .set (true );
792
- return false ;
793
- }
794
- else {
795
- return true ;
796
- }
797
- }
798
-
799
784
/**
800
785
* Perform a commit or message acknowledgement, as appropriate.
801
786
* @param locallyTransacted Whether the channel is locally transacted.
@@ -861,14 +846,18 @@ public void handleRecoveryStarted(Recoverable recoverable) {
861
846
@ Override
862
847
public String toString () {
863
848
return "Consumer@" + ObjectUtils .getIdentityHexString (this ) + ": "
864
- + "tags=[" + (this .consumerTags .toString ()) + "], channel=" + this .channel
849
+ + "tags=[" + getConsumerTags ()
850
+ + "], channel=" + this .channel
865
851
+ ", acknowledgeMode=" + this .acknowledgeMode + " local queue size=" + this .queue .size ();
866
852
}
867
853
868
854
private final class InternalConsumer extends DefaultConsumer {
869
855
870
- InternalConsumer (Channel channel ) {
856
+ private final String queue ;
857
+
858
+ InternalConsumer (Channel channel , String queue ) {
871
859
super (channel );
860
+ this .queue = queue ;
872
861
}
873
862
874
863
@ Override
@@ -877,6 +866,10 @@ public void handleConsumeOk(String consumerTag) {
877
866
if (logger .isDebugEnabled ()) {
878
867
logger .debug ("ConsumeOK: " + BlockingQueueConsumer .this );
879
868
}
869
+ if (BlockingQueueConsumer .this .applicationEventPublisher != null ) {
870
+ BlockingQueueConsumer .this .applicationEventPublisher
871
+ .publishEvent (new ConsumeOkEvent (this , this .queue , consumerTag ));
872
+ }
880
873
}
881
874
882
875
@ Override
@@ -899,19 +892,23 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
899
892
public void handleCancel (String consumerTag ) throws IOException {
900
893
if (logger .isWarnEnabled ()) {
901
894
logger .warn ("Cancel received for " + consumerTag + " ("
902
- + BlockingQueueConsumer . this .consumerTags . get ( consumerTag )
895
+ + this .queue
903
896
+ "); " + BlockingQueueConsumer .this );
904
897
}
905
- if (removeConsumer (consumerTag )) {
898
+ BlockingQueueConsumer .this .consumers .remove (this .queue );
899
+ if (!BlockingQueueConsumer .this .consumers .isEmpty ()) {
906
900
basicCancel (false );
907
901
}
902
+ else {
903
+ BlockingQueueConsumer .this .cancelled .set (true );
904
+ }
908
905
}
909
906
910
907
@ Override
911
908
public void handleCancelOk (String consumerTag ) {
912
909
if (logger .isDebugEnabled ()) {
913
910
logger .debug ("Received cancelOk for tag " + consumerTag + " ("
914
- + BlockingQueueConsumer . this .consumerTags . get ( consumerTag )
911
+ + this .queue
915
912
+ "); " + BlockingQueueConsumer .this );
916
913
}
917
914
}
@@ -926,8 +923,10 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
926
923
}
927
924
try {
928
925
if (BlockingQueueConsumer .this .abortStarted > 0 ) {
929
- if (!BlockingQueueConsumer .this .queue .offer (new Delivery (consumerTag , envelope , properties , body ),
926
+ if (!BlockingQueueConsumer .this .queue .offer (
927
+ new Delivery (consumerTag , envelope , properties , body , this .queue ),
930
928
BlockingQueueConsumer .this .shutdownTimeout , TimeUnit .MILLISECONDS )) {
929
+
931
930
RabbitUtils .setPhysicalCloseRequired (getChannel (), true );
932
931
// Defensive - should never happen
933
932
BlockingQueueConsumer .this .queue .clear ();
@@ -942,73 +941,19 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
942
941
}
943
942
}
944
943
else {
945
- BlockingQueueConsumer .this .queue .put (new Delivery (consumerTag , envelope , properties , body ));
944
+ BlockingQueueConsumer .this .queue
945
+ .put (new Delivery (consumerTag , envelope , properties , body , this .queue ));
946
946
}
947
947
}
948
948
catch (InterruptedException e ) {
949
949
Thread .currentThread ().interrupt ();
950
950
}
951
951
}
952
952
953
- }
954
-
955
- private static final class ConsumerDecorator implements Consumer {
956
-
957
- private final String queue ;
958
-
959
- private final Consumer delegate ;
960
-
961
- private final ApplicationEventPublisher applicationEventPublisher ;
962
-
963
- private String consumerTag ;
964
-
965
- ConsumerDecorator (String queue , Consumer delegate , ApplicationEventPublisher applicationEventPublisher ) {
966
- this .queue = queue ;
967
- this .delegate = delegate ;
968
- this .applicationEventPublisher = applicationEventPublisher ;
969
- }
970
-
971
-
972
- @ Override
973
- public void handleConsumeOk (String consumerTag ) {
974
- this .consumerTag = consumerTag ;
975
- this .delegate .handleConsumeOk (consumerTag );
976
- if (this .applicationEventPublisher != null ) {
977
- this .applicationEventPublisher .publishEvent (new ConsumeOkEvent (this .delegate , this .queue , consumerTag ));
978
- }
979
- }
980
-
981
- @ Override
982
- public void handleShutdownSignal (String consumerTag , ShutdownSignalException sig ) {
983
- this .delegate .handleShutdownSignal (consumerTag , sig );
984
- }
985
-
986
- @ Override
987
- public void handleCancel (String consumerTag ) throws IOException {
988
- this .delegate .handleCancel (consumerTag );
989
- }
990
-
991
- @ Override
992
- public void handleCancelOk (String consumerTag ) {
993
- this .delegate .handleCancelOk (consumerTag );
994
- }
995
-
996
- @ Override
997
- public void handleDelivery (String consumerTag , Envelope envelope , AMQP .BasicProperties properties ,
998
- byte [] body ) throws IOException {
999
-
1000
- this .delegate .handleDelivery (consumerTag , envelope , properties , body );
1001
- }
1002
-
1003
- @ Override
1004
- public void handleRecoverOk (String consumerTag ) {
1005
- this .delegate .handleRecoverOk (consumerTag );
1006
- }
1007
-
1008
953
@ Override
1009
954
public String toString () {
1010
- return "ConsumerDecorator {" + "queue='" + this .queue + '\'' +
1011
- ", consumerTag='" + this . consumerTag + '\'' +
955
+ return "InternalConsumer {" + "queue='" + this .queue + '\'' +
956
+ ", consumerTag='" + getConsumerTag () + '\'' +
1012
957
'}' ;
1013
958
}
1014
959
0 commit comments