Skip to content

Commit bf32231

Browse files
authored
GH-1455: AdviceChain on Stream Listener Container
Resolves #1455 Add an advice chain to the stream listener container and its factory. Add a `StreamMessageRecoverer` for native stream messages. Add a retry interceptor to work with native stream messages. **cherry-pick to 2.4.x** * Add since to new setter.
1 parent 9d49f20 commit bf32231

File tree

13 files changed

+427
-74
lines changed

13 files changed

+427
-74
lines changed

Diff for: spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/StreamRabbitListenerContainerFactory.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,12 +18,15 @@
1818

1919
import java.lang.reflect.Method;
2020

21+
import org.aopalliance.aop.Advice;
22+
2123
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
2224
import org.springframework.amqp.rabbit.config.BaseRabbitListenerContainerFactory;
2325
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
2426
import org.springframework.amqp.rabbit.listener.MethodRabbitListenerEndpoint;
2527
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
2628
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
29+
import org.springframework.amqp.utils.JavaUtils;
2730
import org.springframework.lang.Nullable;
2831
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
2932
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
@@ -96,9 +99,10 @@ public StreamListenerContainer createListenerContainer(RabbitListenerEndpoint en
9699
});
97100
}
98101
StreamListenerContainer container = createContainerInstance();
99-
if (this.consumerCustomizer != null) {
100-
container.setConsumerCustomizer(this.consumerCustomizer);
101-
}
102+
Advice[] adviceChain = getAdviceChain();
103+
JavaUtils.INSTANCE
104+
.acceptIfNotNull(this.consumerCustomizer, container::setConsumerCustomizer)
105+
.acceptIfNotNull(adviceChain, container::setAdviceChain);
102106
applyCommonOverrides(endpoint, container);
103107
if (this.containerCustomizer != null) {
104108
this.containerCustomizer.configure(container);

Diff for: spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java

+46-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,13 +16,16 @@
1616

1717
package org.springframework.rabbit.stream.listener;
1818

19+
import org.aopalliance.aop.Advice;
1920
import org.apache.commons.logging.Log;
2021
import org.apache.commons.logging.LogFactory;
2122

2223
import org.springframework.amqp.core.Message;
2324
import org.springframework.amqp.core.MessageListener;
2425
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
2526
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
27+
import org.springframework.aop.framework.ProxyFactory;
28+
import org.springframework.aop.support.DefaultPointcutAdvisor;
2629
import org.springframework.beans.factory.BeanNameAware;
2730
import org.springframework.lang.Nullable;
2831
import org.springframework.rabbit.stream.support.StreamMessageProperties;
@@ -62,6 +65,10 @@ public class StreamListenerContainer implements MessageListenerContainer, BeanNa
6265

6366
private MessageListener messageListener;
6467

68+
private StreamMessageListener streamListener;
69+
70+
private Advice[] adviceChain;
71+
6572
/**
6673
* Construct an instance using the provided environment.
6774
* @param environment the environment.
@@ -154,6 +161,18 @@ public void setAutoStartup(boolean autoStart) {
154161
public boolean isAutoStartup() {
155162
return this.autoStartup;
156163
}
164+
165+
/**
166+
* Set an advice chain to apply to the listener.
167+
* @param advices the advice chain.
168+
* @since 2.4.5
169+
*/
170+
public void setAdviceChain(Advice... advices) {
171+
Assert.notNull(advices, "'advices' cannot be null");
172+
Assert.noNullElements(advices, "'advices' cannot have null elements");
173+
this.adviceChain = advices;
174+
}
175+
157176
@Override
158177
@Nullable
159178
public Object getMessageListener() {
@@ -183,26 +202,46 @@ public synchronized void stop() {
183202

184203
@Override
185204
public void setupMessageListener(MessageListener messageListener) {
186-
this.messageListener = messageListener;
205+
adviseIfNeeded(messageListener);
187206
this.builder.messageHandler((context, message) -> {
188-
if (messageListener instanceof StreamMessageListener) {
189-
((StreamMessageListener) messageListener).onStreamMessage(message, context);
207+
if (this.streamListener != null) {
208+
this.streamListener.onStreamMessage(message, context);
190209
}
191210
else {
192211
Message message2 = this.streamConverter.toMessage(message, new StreamMessageProperties(context));
193-
if (messageListener instanceof ChannelAwareMessageListener) {
212+
if (this.messageListener instanceof ChannelAwareMessageListener) {
194213
try {
195-
((ChannelAwareMessageListener) messageListener).onMessage(message2, null);
214+
((ChannelAwareMessageListener) this.messageListener).onMessage(message2, null);
196215
}
197216
catch (Exception e) { // NOSONAR
198217
this.logger.error("Listner threw an exception", e);
199218
}
200219
}
201220
else {
202-
messageListener.onMessage(message2);
221+
this.messageListener.onMessage(message2);
203222
}
204223
}
205224
});
206225
}
207226

227+
private void adviseIfNeeded(MessageListener messageListener) {
228+
this.messageListener = messageListener;
229+
if (messageListener instanceof StreamMessageListener) {
230+
this.streamListener = (StreamMessageListener) messageListener;
231+
}
232+
if (this.adviceChain != null && this.adviceChain.length > 0) {
233+
ProxyFactory factory = new ProxyFactory(messageListener);
234+
for (Advice advice : this.adviceChain) {
235+
factory.addAdvisor(new DefaultPointcutAdvisor(advice));
236+
}
237+
factory.setInterfaces(messageListener.getClass().getInterfaces());
238+
if (this.streamListener != null) {
239+
this.streamListener = (StreamMessageListener) factory.getProxy(getClass().getClassLoader());
240+
}
241+
else {
242+
this.messageListener = (MessageListener) factory.getProxy(getClass().getClassLoader());
243+
}
244+
}
245+
}
246+
208247
}

Diff for: spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/adapter/StreamMessageListenerAdapter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import org.springframework.amqp.rabbit.listener.adapter.InvocationResult;
2222
import org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter;
2323
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
24+
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
2425
import org.springframework.rabbit.stream.listener.StreamMessageListener;
2526

2627
import com.rabbitmq.stream.Message;
@@ -60,7 +61,7 @@ public void onStreamMessage(Message message, Context context) {
6061
}
6162
}
6263
catch (Exception ex) {
63-
this.logger.error("Failed to invoke listener", ex);
64+
throw new ListenerExecutionFailedException("Failed to invoke listener", ex);
6465
}
6566
}
6667

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.retry;
18+
19+
import org.springframework.amqp.core.Message;
20+
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
21+
22+
import com.rabbitmq.stream.MessageHandler.Context;
23+
24+
/**
25+
* Implementations of this interface can handle failed messages after retries are
26+
* exhausted.
27+
*
28+
* @author Gary Russell
29+
* @since 2.4.5
30+
*
31+
*/
32+
@FunctionalInterface
33+
public interface StreamMessageRecoverer extends MessageRecoverer {
34+
35+
@Override
36+
default void recover(Message message, Throwable cause) {
37+
}
38+
39+
/**
40+
* Callback for message that was consumed but failed all retry attempts.
41+
*
42+
* @param message the message to recover.
43+
* @param context the context.
44+
* @param cause the cause of the error.
45+
*/
46+
void recover(com.rabbitmq.stream.Message message, Context context, Throwable cause);
47+
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.retry;
18+
19+
import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean;
20+
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
21+
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
22+
import org.springframework.retry.RetryOperations;
23+
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
24+
import org.springframework.retry.support.RetryTemplate;
25+
26+
import com.rabbitmq.stream.Message;
27+
import com.rabbitmq.stream.MessageHandler.Context;
28+
29+
/**
30+
* Convenient factory bean for creating a stateless retry interceptor for use in a
31+
* {@link StreamListenerContainer} when consuming native stream messages, giving you a
32+
* large amount of control over the behavior of a container when a listener fails. To
33+
* control the number of retry attempt or the backoff in between attempts, supply a
34+
* customized {@link RetryTemplate}. Stateless retry is appropriate if your listener can
35+
* be called repeatedly between failures with no side effects. The semantics of stateless
36+
* retry mean that a listener exception is not propagated to the container until the retry
37+
* attempts are exhausted. When the retry attempts are exhausted it can be processed using
38+
* a {@link StreamMessageRecoverer} if one is provided.
39+
*
40+
* @author Gary Russell
41+
*
42+
* @see RetryOperations#execute(org.springframework.retry.RetryCallback,org.springframework.retry.RecoveryCallback)
43+
*/
44+
public class StreamRetryOperationsInterceptorFactoryBean extends StatelessRetryOperationsInterceptorFactoryBean {
45+
46+
@Override
47+
protected MethodInvocationRecoverer<?> createRecoverer() {
48+
return (args, cause) -> {
49+
StreamMessageRecoverer messageRecoverer = (StreamMessageRecoverer) getMessageRecoverer();
50+
Object arg = args[0];
51+
if (arg instanceof org.springframework.amqp.core.Message) {
52+
return super.recover(args, cause);
53+
}
54+
else {
55+
if (messageRecoverer == null) {
56+
this.logger.warn("Message(s) dropped on recovery: " + arg, cause);
57+
}
58+
else {
59+
messageRecoverer.recover((Message) arg, (Context) args[1], cause);
60+
}
61+
return null;
62+
}
63+
};
64+
}
65+
66+
/**
67+
* Set a {@link StreamMessageRecoverer} to call when retries are exhausted.
68+
* @param messageRecoverer the recoverer.
69+
*/
70+
public void setStreamMessageRecoverer(StreamMessageRecoverer messageRecoverer) {
71+
super.setMessageRecoverer(messageRecoverer);
72+
}
73+
74+
@Override
75+
public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
76+
throw new UnsupportedOperationException("Use setStreamMessageRecoverer() instead");
77+
}
78+
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes supporting retries.
3+
*/
4+
package org.springframework.rabbit.stream.retry;

0 commit comments

Comments
 (0)