Skip to content

Commit 7e77ae3

Browse files
garyrussellartembilan
authored andcommitted
Add returned message to correlation data
If confirms and returns are enabled, a returned message is sent before the ack; add the returned message to the correlation data, if provided. * Polishing - PR Comments * Polishing - PR comments - use the id field for correlation instead of adding a new field.
1 parent 4afefc4 commit 7e77ae3

File tree

7 files changed

+114
-10
lines changed

7 files changed

+114
-10
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -873,9 +873,11 @@ public void send(final String exchange, final String routingKey,
873873
final Message message, final CorrelationData correlationData)
874874
throws AmqpException {
875875
execute(channel -> {
876-
doSend(channel, exchange, routingKey, message, RabbitTemplate.this.returnCallback != null
876+
doSend(channel, exchange, routingKey, message,
877+
(RabbitTemplate.this.returnCallback != null
878+
|| (correlationData != null && StringUtils.hasText(correlationData.getId())))
877879
&& RabbitTemplate.this.mandatoryExpression.getValue(
878-
RabbitTemplate.this.evaluationContext, message, Boolean.class),
880+
RabbitTemplate.this.evaluationContext, message, Boolean.class),
879881
correlationData);
880882
return null;
881883
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
@@ -1987,7 +1989,7 @@ public void doSend(Channel channel, String exchange, String routingKey, Message
19871989
Message messageToUse = message;
19881990
MessageProperties messageProperties = messageToUse.getMessageProperties();
19891991
if (mandatory) {
1990-
messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION_KEY, this.uuid);
1992+
messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
19911993
}
19921994
if (this.beforePublishPostProcessors != null) {
19931995
for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
@@ -2026,6 +2028,10 @@ private void setupConfirm(Channel channel, Message message, CorrelationData corr
20262028
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
20272029
publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo,
20282030
new PendingConfirm(correlationData, System.currentTimeMillis()));
2031+
if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
2032+
message.getMessageProperties().setHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY,
2033+
correlationData.getId());
2034+
}
20292035
}
20302036
else if (channel instanceof ChannelProxy && ((ChannelProxy) channel).isConfirmSelected()) {
20312037
long nextPublishSeqNo = channel.getNextPublishSeqNo();
@@ -2178,7 +2184,7 @@ else if (logger.isWarnEnabled()) {
21782184
}
21792185
}
21802186
if (returnCallback != null) {
2181-
properties.getHeaders().remove(PublisherCallbackChannel.RETURN_CORRELATION_KEY);
2187+
properties.getHeaders().remove(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY);
21822188
MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(
21832189
properties, null, this.encoding);
21842190
Message returnedMessage = new Message(body, messageProperties);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/CorrelationData.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.amqp.rabbit.support;
1818

1919
import org.springframework.amqp.core.Correlation;
20+
import org.springframework.amqp.core.Message;
2021
import org.springframework.lang.Nullable;
2122
import org.springframework.util.concurrent.SettableListenableFuture;
2223

@@ -36,6 +37,8 @@ public class CorrelationData implements Correlation {
3637

3738
private volatile String id;
3839

40+
private volatile Message returnedMessage;
41+
3942
/**
4043
* Construct an instance with a null Id.
4144
* @since 1.6.7
@@ -45,7 +48,8 @@ public CorrelationData() {
4548
}
4649

4750
/**
48-
* Construct an instance with the supplied id.
51+
* Construct an instance with the supplied id. Must be unique if returns are enabled
52+
* to allow population of the {@link #setReturnedMessage(Message) returnedMessage}.
4953
* @param id the id.
5054
*/
5155
public CorrelationData(String id) {
@@ -70,10 +74,31 @@ public void setId(String id) {
7074
this.id = id;
7175
}
7276

77+
/**
78+
* Return a future to check the success/failure of the publish operation.
79+
* @return the future.
80+
* @since 2.1
81+
*/
7382
public SettableListenableFuture<Confirm> getFuture() {
7483
return this.future;
7584
}
7685

86+
/**
87+
* Return a returned message, if any; requires a unique
88+
* {@link #CorrelationData(String) id}. Guaranteed to be populated before the future
89+
* is set.
90+
* @return the message or null.
91+
* @since 2.1
92+
*/
93+
@Nullable
94+
public Message getReturnedMessage() {
95+
return this.returnedMessage;
96+
}
97+
98+
void setReturnedMessage(Message returnedMessage) {
99+
this.returnedMessage = returnedMessage;
100+
}
101+
77102
@Override
78103
public String toString() {
79104
return "CorrelationData [id=" + this.id + "]";

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/PublisherCallbackChannel.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,15 @@
3535
*/
3636
public interface PublisherCallbackChannel extends Channel {
3737

38-
String RETURN_CORRELATION_KEY = "spring_listener_return_correlation";
38+
/**
39+
* Header used to determine which listener to invoke for a returned message.
40+
*/
41+
String RETURN_LISTENER_CORRELATION_KEY = "spring_listener_return_correlation";
42+
43+
/**
44+
* Header used to locate a pending confirm to which to attach a returned message.
45+
*/
46+
String RETURNED_MESSAGE_CORRELATION_KEY = "spring_returned_message_correlation";
3947

4048
/**
4149
* Adds a {@link Listener}.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/PublisherCallbackChannelImpl.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.amqp.rabbit.support;
1818

1919
import java.io.IOException;
20+
import java.nio.charset.StandardCharsets;
2021
import java.util.ArrayList;
2122
import java.util.Collection;
2223
import java.util.Collections;
@@ -38,8 +39,11 @@
3839
import org.apache.commons.logging.Log;
3940
import org.apache.commons.logging.LogFactory;
4041

42+
import org.springframework.amqp.core.Message;
43+
import org.springframework.amqp.core.MessageProperties;
4144
import org.springframework.amqp.rabbit.support.CorrelationData.Confirm;
4245
import org.springframework.util.Assert;
46+
import org.springframework.util.StringUtils;
4347

4448
import com.rabbitmq.client.AMQP;
4549
import com.rabbitmq.client.AMQP.Basic.RecoverOk;
@@ -63,7 +67,9 @@
6367
import com.rabbitmq.client.Consumer;
6468
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
6569
import com.rabbitmq.client.DeliverCallback;
70+
import com.rabbitmq.client.Envelope;
6671
import com.rabbitmq.client.GetResponse;
72+
import com.rabbitmq.client.LongString;
6773
import com.rabbitmq.client.Method;
6874
import com.rabbitmq.client.ReturnCallback;
6975
import com.rabbitmq.client.ReturnListener;
@@ -84,6 +90,8 @@ public class PublisherCallbackChannelImpl
8490

8591
private static final ExecutorService DEFAULT_EXECUTOR = Executors.newSingleThreadExecutor();
8692

93+
private static final MessagePropertiesConverter converter = new DefaultMessagePropertiesConverter();
94+
8795
private final Log logger = LogFactory.getLog(this.getClass());
8896

8997
private final Channel delegate;
@@ -92,12 +100,14 @@ public class PublisherCallbackChannelImpl
92100

93101
private final Map<Listener, SortedMap<Long, PendingConfirm>> pendingConfirms = new ConcurrentHashMap<>();
94102

95-
private final SortedMap<Long, Listener> listenerForSeq = new ConcurrentSkipListMap<>();
103+
private final Map<String, PendingConfirm> pendingReturns = new ConcurrentHashMap<>();
96104

97-
private volatile java.util.function.Consumer<Channel> afterAckCallback;
105+
private final SortedMap<Long, Listener> listenerForSeq = new ConcurrentSkipListMap<>();
98106

99107
private final ExecutorService executor;
100108

109+
private volatile java.util.function.Consumer<Channel> afterAckCallback;
110+
101111
public PublisherCallbackChannelImpl(Channel delegate) {
102112
this(delegate, null);
103113
}
@@ -872,6 +882,10 @@ public synchronized Collection<PendingConfirm> expire(Listener listener, long cu
872882
if (pendingConfirm.getTimestamp() < cutoffTime) {
873883
expired.add(pendingConfirm);
874884
iterator.remove();
885+
CorrelationData correlationData = pendingConfirm.getCorrelationData();
886+
if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
887+
this.pendingReturns.remove(correlationData.getId());
888+
}
875889
}
876890
else {
877891
break;
@@ -935,6 +949,9 @@ private void doProcessAck(long seq, boolean ack, boolean multiple, boolean remov
935949
CorrelationData correlationData = value.getCorrelationData();
936950
if (correlationData != null) {
937951
correlationData.getFuture().set(new Confirm(ack, value.getCause()));
952+
if (StringUtils.hasText(correlationData.getId())) {
953+
this.pendingReturns.remove(correlationData.getId());
954+
}
938955
}
939956
iterator.remove();
940957
doHandleConfirm(ack, involvedListener, value);
@@ -961,6 +978,9 @@ private void doProcessAck(long seq, boolean ack, boolean multiple, boolean remov
961978
CorrelationData correlationData = pendingConfirm.getCorrelationData();
962979
if (correlationData != null) {
963980
correlationData.getFuture().set(new Confirm(ack, pendingConfirm.getCause()));
981+
if (StringUtils.hasText(correlationData.getId())) {
982+
this.pendingReturns.remove(correlationData.getId());
983+
}
964984
}
965985
doHandleConfirm(ack, listener, pendingConfirm);
966986
}
@@ -994,6 +1014,12 @@ public synchronized void addPendingConfirm(Listener listener, long seq, PendingC
9941014
"Listener not registered: " + listener + " " + this.pendingConfirms.keySet());
9951015
pendingConfirmsForListener.put(seq, pendingConfirm);
9961016
this.listenerForSeq.put(seq, listener);
1017+
if (pendingConfirm.getCorrelationData() != null) {
1018+
String returnCorrelation = pendingConfirm.getCorrelationData().getId();
1019+
if (StringUtils.hasText(returnCorrelation)) {
1020+
this.pendingReturns.put(returnCorrelation, pendingConfirm);
1021+
}
1022+
}
9971023
}
9981024

9991025
// ReturnListener
@@ -1005,7 +1031,16 @@ public void handleReturn(int replyCode,
10051031
String routingKey,
10061032
AMQP.BasicProperties properties,
10071033
byte[] body) throws IOException {
1008-
String uuidObject = properties.getHeaders().get(RETURN_CORRELATION_KEY).toString();
1034+
LongString returnCorrelation = (LongString) properties.getHeaders().get(RETURNED_MESSAGE_CORRELATION_KEY);
1035+
if (returnCorrelation != null) {
1036+
PendingConfirm confirm = this.pendingReturns.remove(returnCorrelation.toString());
1037+
if (confirm != null) {
1038+
MessageProperties messageProperties = converter.toMessageProperties(properties,
1039+
new Envelope(0L, false, exchange, routingKey), StandardCharsets.UTF_8.name());
1040+
confirm.getCorrelationData().setReturnedMessage(new Message(body, messageProperties));
1041+
}
1042+
}
1043+
String uuidObject = properties.getHeaders().get(RETURN_LISTENER_CORRELATION_KEY).toString();
10091044
Listener listener = this.listeners.get(uuidObject);
10101045
if (listener == null || !listener.isReturnListener()) {
10111046
if (this.logger.isWarnEnabled()) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,14 @@ public class RabbitTemplatePublisherCallbacksIntegrationTests {
109109

110110
private CachingConnectionFactory connectionFactoryWithReturnsEnabled;
111111

112+
private CachingConnectionFactory connectionFactoryWithConfirmsAndReturnsEnabled;
113+
112114
private RabbitTemplate templateWithConfirmsEnabled;
113115

114116
private RabbitTemplate templateWithReturnsEnabled;
115117

118+
private RabbitTemplate templateWithConfirmsAndReturnsEnabled;
119+
116120
@Before
117121
public void create() {
118122
connectionFactory = new CachingConnectionFactory();
@@ -131,6 +135,14 @@ public void create() {
131135
connectionFactoryWithReturnsEnabled.setPort(BrokerTestUtils.getPort());
132136
connectionFactoryWithReturnsEnabled.setPublisherReturns(true);
133137
templateWithReturnsEnabled = new RabbitTemplate(connectionFactoryWithReturnsEnabled);
138+
connectionFactoryWithConfirmsAndReturnsEnabled = new CachingConnectionFactory();
139+
connectionFactoryWithConfirmsAndReturnsEnabled.setHost("localhost");
140+
connectionFactoryWithConfirmsAndReturnsEnabled.setChannelCacheSize(100);
141+
connectionFactoryWithConfirmsAndReturnsEnabled.setPort(BrokerTestUtils.getPort());
142+
connectionFactoryWithConfirmsAndReturnsEnabled.setPublisherConfirms(true);
143+
connectionFactoryWithConfirmsAndReturnsEnabled.setPublisherReturns(true);
144+
templateWithConfirmsAndReturnsEnabled = new RabbitTemplate(connectionFactoryWithConfirmsAndReturnsEnabled);
145+
templateWithConfirmsAndReturnsEnabled.setMandatory(true);
134146
}
135147

136148
@After
@@ -546,7 +558,7 @@ public void testPublisherConfirmMultipleWithTwoListeners() throws Exception {
546558
* time as adding a new pending ack to the map. Test verifies we don't
547559
* get a {@link ConcurrentModificationException}.
548560
*/
549-
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
561+
@SuppressWarnings({ "rawtypes", "unchecked" })
550562
@Test
551563
public void testConcurrentConfirms() throws Exception {
552564
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
@@ -824,6 +836,20 @@ public void testWithFuture() throws Exception {
824836
this.templateWithConfirmsEnabled.convertAndSend("", queue.getName(), "bar", cd2);
825837
// TODO: Uncomment when travis updates to rabbitmq 3.7
826838
// assertFalse(cd2.getFuture().get(10, TimeUnit.SECONDS).isAck());
839+
CorrelationData cd3 = new CorrelationData();
840+
this.templateWithConfirmsEnabled.convertAndSend("NO_EXCHANGE_HERE", queue.getName(), "foo", cd3);
841+
assertFalse(cd3.getFuture().get(10, TimeUnit.SECONDS).isAck());
842+
assertThat(cd3.getFuture().get().getReason(), containsString("NOT_FOUND"));
843+
CorrelationData cd4 = new CorrelationData("42");
844+
AtomicBoolean resent = new AtomicBoolean();
845+
this.templateWithConfirmsAndReturnsEnabled.setReturnCallback((m, r, rt, e, rk) -> {
846+
this.templateWithConfirmsEnabled.send(ROUTE, m);
847+
resent.set(true);
848+
});
849+
this.templateWithConfirmsAndReturnsEnabled.convertAndSend("", "NO_QUEUE_HERE", "foo", cd4);
850+
assertTrue(cd4.getFuture().get(10, TimeUnit.SECONDS).isAck());
851+
assertNotNull(cd4.getReturnedMessage());
852+
assertTrue(resent.get());
827853
admin.deleteQueue(queue.getName());
828854
}
829855

src/reference/asciidoc/amqp.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,9 @@ Since it's a `ListenableFuture<Confirm>`, you can either `get()` the result when
10001000
The `Confirm` object is a simple bean with 2 properties `ack` and `reason` (for nacks).
10011001
The reason is not populated for broker-generated nacks; it is populated for nacks generated by the framework (e.g. closing the connection while acks are outstanding).
10021002

1003+
In addition, when both confirms and returns are enabled, the `CorrelationData` is populated with the returned message.
1004+
It is guaranteed that this will occur before the future is set with the ack.
1005+
10031006
See also <<scoped-operations>> for a simpler mechanism for waiting for publisher confirms.
10041007

10051008
[[scoped-operations]]

src/reference/asciidoc/whats-new.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ The `RabbitTemplate` now can be configured with the `noLocalReplyConsumer` optio
4848
See <<request-reply>> for more information.
4949

5050
`CorrelationData` for publisher confirms now has a `ListenableFuture` which can be used to get the acknowledgment instead of using a callback.
51+
When returns and confirms are enabled, the correlation data, if provided, is populated with the returned message.
5152
See <<template-confirms>> for more information.
5253

5354
===== Message Convertion

0 commit comments

Comments
 (0)