Skip to content

Commit 1d38c14

Browse files
garyrussellartembilan
authored andcommitted
GH-719: Container Factory: Add replyPostProcessor
Resolves #719 Add `replyPostProcessor` to the container factory. * Polishing - support an array of MPPs for consistency with other places. * Polishing - PR Comments
1 parent 5ce8529 commit 1d38c14

File tree

6 files changed

+55
-13
lines changed

6 files changed

+55
-13
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
3232
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
3333
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
34+
import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener;
3435
import org.springframework.amqp.support.ConsumerTagStrategy;
3536
import org.springframework.amqp.support.converter.MessageConverter;
3637
import org.springframework.beans.BeansException;
@@ -103,6 +104,8 @@ public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractM
103104

104105
private MessagePostProcessor[] afterReceivePostProcessors;
105106

107+
private MessagePostProcessor[] beforeSendReplyPostProcessors;
108+
106109
protected final AtomicInteger counter = new AtomicInteger();
107110

108111
/**
@@ -274,13 +277,23 @@ public void setPhase(int phase) {
274277
}
275278

276279
/**
280+
* Set post processors which will be applied after the Message is received.
277281
* @param afterReceivePostProcessors the post processors.
282+
* @since 2.0
278283
* @see AbstractMessageListenerContainer#setAfterReceivePostProcessors(MessagePostProcessor...)
279284
*/
280285
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
281286
this.afterReceivePostProcessors = afterReceivePostProcessors;
282287
}
283288

289+
/**
290+
* Set post processors that will be applied before sending replies.
291+
* @param beforeSendReplyPostProcessors the post processors.
292+
* @since 2.0.3
293+
*/
294+
public void setBeforeSendReplyPostProcessors(MessagePostProcessor... beforeSendReplyPostProcessors) {
295+
this.beforeSendReplyPostProcessors = beforeSendReplyPostProcessors;
296+
}
284297

285298
@Override
286299
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
@@ -355,6 +368,11 @@ else if (this.autoStartup != null) {
355368
instance.setListenerId(endpoint.getId());
356369

357370
endpoint.setupListenerContainer(instance);
371+
if (this.beforeSendReplyPostProcessors != null
372+
&& instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
373+
((AbstractAdaptableMessageListener) instance.getMessageListener())
374+
.setBeforeSendReplyPostProcessors(this.beforeSendReplyPostProcessors);
375+
}
358376
initializeContainer(instance, endpoint);
359377

360378
return instance;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.amqp.rabbit.listener.adapter;
1818

19+
import java.util.Arrays;
20+
1921
import org.apache.commons.logging.Log;
2022
import org.apache.commons.logging.LogFactory;
2123

@@ -88,7 +90,7 @@ public abstract class AbstractAdaptableMessageListener implements MessageListene
8890

8991
private String encoding = DEFAULT_ENCODING;
9092

91-
private MessagePostProcessor replyPostProcessor;
93+
private MessagePostProcessor[] beforeSendReplyPostProcessors;
9294

9395
/**
9496
* Set the routing key to use when sending response messages.
@@ -173,12 +175,26 @@ public void setMessageConverter(MessageConverter messageConverter) {
173175
}
174176

175177
/**
176-
* Set a post processor to process the reply immediately before {@code Channel#basicPublish()}.
177-
* Often used to compress the data.
178+
* Set a post processor to process the reply immediately before
179+
* {@code Channel#basicPublish()}. Often used to compress the data.
178180
* @param replyPostProcessor the reply post processor.
181+
* @deprecated in favor of
182+
* {@link #setBeforeSendReplyPostProcessors(MessagePostProcessor...)}.
179183
*/
184+
@Deprecated
180185
public void setReplyPostProcessor(MessagePostProcessor replyPostProcessor) {
181-
this.replyPostProcessor = replyPostProcessor;
186+
setBeforeSendReplyPostProcessors(replyPostProcessor);
187+
}
188+
189+
/**
190+
* Set post processors that will be applied before sending replies.
191+
* @param beforeSendReplyPostProcessors the post processors.
192+
* @since 2.0.3
193+
*/
194+
public void setBeforeSendReplyPostProcessors(MessagePostProcessor... beforeSendReplyPostProcessors) {
195+
Assert.noNullElements(beforeSendReplyPostProcessors, "'replyPostProcessors' must not have any null elements");
196+
this.beforeSendReplyPostProcessors = Arrays.copyOf(beforeSendReplyPostProcessors,
197+
beforeSendReplyPostProcessors.length);
182198
}
183199

184200
/**
@@ -414,12 +430,11 @@ private Address evaluateReplyTo(Message request, Object source, Object result, E
414430
* @see #postProcessResponse(Message, Message)
415431
*/
416432
protected void sendResponse(Channel channel, Address replyTo, Message messageIn) throws Exception {
417-
Message message;
418-
if (this.replyPostProcessor == null) {
419-
message = messageIn;
420-
}
421-
else {
422-
message = this.replyPostProcessor.postProcessMessage(messageIn);
433+
Message message = messageIn;
434+
if (this.beforeSendReplyPostProcessors != null) {
435+
for (MessagePostProcessor postProcessor : this.beforeSendReplyPostProcessors) {
436+
message = postProcessor.postProcessMessage(message);
437+
}
423438
}
424439
postProcessChannel(channel, message);
425440

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ public void endpointWithHeader() {
370370
Message request = MessageTestUtils.createTextMessage("foo", properties);
371371
Message reply = rabbitTemplate.sendAndReceive("test.header", request);
372372
assertEquals("prefix-FOO", MessageTestUtils.extractText(reply));
373+
assertEquals(reply.getMessageProperties().getHeaders().get("replyMPPApplied"), Boolean.TRUE);
373374
}
374375

375376
@Test
@@ -1105,6 +1106,10 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
11051106
factory.setErrorHandler(errorHandler());
11061107
factory.setConsumerTagStrategy(consumerTagStrategy());
11071108
factory.setReceiveTimeout(10L);
1109+
factory.setBeforeSendReplyPostProcessors(m -> {
1110+
m.getMessageProperties().getHeaders().put("replyMPPApplied", true);
1111+
return m;
1112+
});
11081113
return factory;
11091114
}
11101115

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1382,7 +1382,7 @@ public String handleMessage(String message) {
13821382
return message.toUpperCase();
13831383
}
13841384
});
1385-
messageListener.setReplyPostProcessor(new GZipPostProcessor());
1385+
messageListener.setBeforeSendReplyPostProcessors(new GZipPostProcessor());
13861386
container.setMessageListener(messageListener);
13871387
container.setReceiveTimeout(100);
13881388
container.afterPropertiesSet();

src/reference/asciidoc/amqp.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1850,6 +1850,8 @@ It is possible to customize the listener container factory to use per annotation
18501850
The default is only required if at least one endpoint is registered without a specific container factory.
18511851
See the javadoc for full details and examples.
18521852

1853+
The container factories provide methods for adding `MessagePostProcessor` s that will be applied after receiving messages (before invoking the listener) and before sending replies.
1854+
18531855
If you prefer XML configuration, use the `<rabbit:annotation-driven>` element; any beans annotated with `@RabbitListener` will be detected.
18541856

18551857
For `SimpleRabbitListenerContainer` s:

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ You can now set the `concurrency` of the listener container at the annotation le
141141

142142
You can now set the `autoStartup` property of the listener container at the annotation level, overriding the default setting in the container factory.
143143

144+
You can now set after receive and before send (reply) `MessagePostProcessor` s in the `RabbitListener` container factories.
145+
144146
See <<async-annotation-driven>> for more information.
145147

146148
Starting with _version 2.0.3_, one of the `@RabbitHandler` s on a class-level `@RabbitListener` can be designated as the default.

0 commit comments

Comments
 (0)