Skip to content

Commit 2296e47

Browse files
committed
GH-9854: Special error for "too early message production"
Fixes: #9854 The well-known `Dispatcher has no subscribers` is not very informative when a message is produced from early application context initialization phase * Add internal `ApplicationRunningController` bean to handle early `start()` event * Check for this bean status from the `AbstractMessageChannel.send()` * Throw specific `MessageDispatchingException` to indicate that the message was produced from a wrong place * Adjust `ApplicationEventListeningMessageProducer` logic for `ContextStoppedEvent` & `ContextClosedEvent` to indicate that `AbstractMessageChannel` bean might not dispatch a message because the application context is not running
1 parent 84a3be1 commit 2296e47

File tree

10 files changed

+212
-47
lines changed

10 files changed

+212
-47
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

+52-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,10 +34,14 @@
3434
import io.micrometer.observation.ObservationRegistry;
3535

3636
import org.springframework.beans.factory.BeanFactory;
37+
import org.springframework.context.ApplicationContext;
38+
import org.springframework.context.ConfigurableApplicationContext;
39+
import org.springframework.context.Lifecycle;
3740
import org.springframework.core.OrderComparator;
3841
import org.springframework.core.log.LogAccessor;
3942
import org.springframework.integration.IntegrationPattern;
4043
import org.springframework.integration.IntegrationPatternType;
44+
import org.springframework.integration.MessageDispatchingException;
4145
import org.springframework.integration.context.IntegrationContextUtils;
4246
import org.springframework.integration.context.IntegrationObjectSupport;
4347
import org.springframework.integration.history.MessageHistory;
@@ -110,6 +114,10 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport
110114

111115
private volatile String fullChannelName;
112116

117+
private volatile boolean applicationRunning;
118+
119+
private volatile Lifecycle applicationRunningController;
120+
113121
@Override
114122
public String getComponentType() {
115123
return "channel";
@@ -319,6 +327,7 @@ public boolean send(Message<?> message) {
319327
public boolean send(Message<?> messageArg, long timeout) {
320328
Assert.notNull(messageArg, "message must not be null");
321329
Assert.notNull(messageArg.getPayload(), "message payload must not be null");
330+
assertApplicationRunning(messageArg);
322331
Message<?> message = messageArg;
323332
if (this.shouldTrack) {
324333
message = MessageHistory.write(message, this, getMessageBuilderFactory());
@@ -335,6 +344,39 @@ else if (this.metricsCaptor != null) {
335344
}
336345
}
337346

347+
private void assertApplicationRunning(Message<?> message) {
348+
if (!this.applicationRunning) {
349+
ApplicationContext applicationContext = getApplicationContext();
350+
this.applicationRunning =
351+
applicationContext == null ||
352+
!applicationContext.containsBean(
353+
IntegrationContextUtils.APPLICATION_RUNNING_CONTROLLER_BEAN_NAME);
354+
355+
if (!this.applicationRunning) {
356+
if (((ConfigurableApplicationContext) applicationContext).isActive()) {
357+
this.applicationRunningController =
358+
applicationContext.getBean(IntegrationContextUtils.APPLICATION_RUNNING_CONTROLLER_BEAN_NAME,
359+
Lifecycle.class);
360+
this.applicationRunning = this.applicationRunningController.isRunning();
361+
}
362+
}
363+
}
364+
365+
if (this.applicationRunning && this.applicationRunningController != null) {
366+
this.applicationRunning = this.applicationRunningController.isRunning();
367+
}
368+
369+
if (!this.applicationRunning) {
370+
throw new MessageDispatchingException(message,
371+
"""
372+
The application context is not ready to dispatch messages. \
373+
It has to be refreshed or started first. \
374+
Also, messages must not be emitted from initialization phase, \
375+
like 'afterPropertiesSet()', '@PostConstruct' or bean definition methods. \
376+
Consider to use 'SmartLifecycle.start()' instead.""");
377+
}
378+
}
379+
338380
private boolean sendWithObservation(Message<?> message, long timeout) {
339381
MutableMessage<?> messageToSend = MutableMessage.of(message);
340382
Observation observation = IntegrationObservation.PRODUCER.observation(
@@ -343,15 +385,15 @@ private boolean sendWithObservation(Message<?> message, long timeout) {
343385
() -> new MessageSenderContext(messageToSend, getComponentName()),
344386
this.observationRegistry);
345387
Boolean observe = observation.observe(() -> {
346-
Message<?> messageToSendInternal = messageToSend;
347-
if (message instanceof ErrorMessage errorMessage) {
348-
messageToSendInternal =
349-
new ErrorMessage(errorMessage.getPayload(),
350-
messageToSend.getHeaders(),
351-
errorMessage.getOriginalMessage());
352-
}
353-
return sendInternal(messageToSendInternal, timeout);
354-
});
388+
Message<?> messageToSendInternal = messageToSend;
389+
if (message instanceof ErrorMessage errorMessage) {
390+
messageToSendInternal =
391+
new ErrorMessage(errorMessage.getPayload(),
392+
messageToSend.getHeaders(),
393+
errorMessage.getOriginalMessage());
394+
}
395+
return sendInternal(messageToSendInternal, timeout);
396+
});
355397
return Boolean.TRUE.equals(observe);
356398
}
357399

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.config;
18+
19+
import org.springframework.context.SmartLifecycle;
20+
21+
/**
22+
* An infrastructure bean to hold the status of the application context when
23+
* it is ready for interaction: refreshed or started.
24+
* <p>
25+
* Well-known {@link org.springframework.context.ConfigurableApplicationContext#isRunning()}
26+
* (or {@link org.springframework.context.event.ContextRefreshedEvent})
27+
* is good for target applications, when all the beans are already started,
28+
* but most of Spring Integration channel adapters initiate their logic
29+
* from the {@link SmartLifecycle#start()} implementation, so it would be false report
30+
* that application is not running during start.
31+
* <p>
32+
* This implementation uses {@value Integer#MIN_VALUE} for its phase to be started as early as possible.
33+
*
34+
* @author Artem Bilan
35+
*
36+
* @since 6.5
37+
*/
38+
class ApplicationRunningController implements SmartLifecycle {
39+
40+
private volatile boolean running;
41+
42+
@Override
43+
public void start() {
44+
this.running = true;
45+
}
46+
47+
@Override
48+
public void stop() {
49+
this.running = false;
50+
}
51+
52+
@Override
53+
public boolean isRunning() {
54+
return this.running;
55+
}
56+
57+
@Override
58+
public int getPhase() {
59+
return Integer.MIN_VALUE;
60+
}
61+
62+
}

Diff for: spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -129,6 +129,7 @@ public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) t
129129
registerListMessageHandlerMethodFactory();
130130
registerIntegrationConfigurationReport();
131131
registerControlBusCommandRegistry();
132+
registerApplicationRunningController();
132133
}
133134

134135
@Override
@@ -453,6 +454,17 @@ private void registerControlBusCommandRegistry() {
453454
}
454455
}
455456

457+
private void registerApplicationRunningController() {
458+
if (!this.beanFactory.containsBean(IntegrationContextUtils.APPLICATION_RUNNING_CONTROLLER_BEAN_NAME)) {
459+
BeanDefinitionBuilder builder =
460+
BeanDefinitionBuilder.genericBeanDefinition(ApplicationRunningController.class)
461+
.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
462+
463+
this.registry.registerBeanDefinition(IntegrationContextUtils.APPLICATION_RUNNING_CONTROLLER_BEAN_NAME,
464+
builder.getBeanDefinition());
465+
}
466+
}
467+
456468
private static BeanDefinitionBuilder createMessageHandlerMethodFactoryBeanDefinition(boolean listCapable) {
457469
return BeanDefinitionBuilder.genericBeanDefinition(IntegrationMessageHandlerMethodFactory.class)
458470
.addConstructorArgValue(listCapable)

Diff for: spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -102,6 +102,8 @@ public abstract class IntegrationContextUtils {
102102

103103
public static final String CONTROL_BUS_COMMAND_REGISTRY_BEAN_NAME = "controlBusCommandRegistry";
104104

105+
public static final String APPLICATION_RUNNING_CONTROLLER_BEAN_NAME = "applicationRunningController";
106+
105107
/**
106108
* The default timeout for blocking operations like send and receive messages.
107109
* @since 6.1

Diff for: spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> messa
334334
return ErrorMessageUtils.getAttributeAccessor(message, null);
335335
}
336336

337-
private MessageChannel getRequiredOutputChannel() {
337+
protected MessageChannel getRequiredOutputChannel() {
338338
MessageChannel messageChannel = getOutputChannel();
339339
Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured");
340340
return messageChannel;

Diff for: spring-integration-core/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,8 +24,10 @@
2424
import org.junit.jupiter.api.BeforeEach;
2525
import org.junit.jupiter.api.Test;
2626

27+
import org.springframework.beans.BeanUtils;
2728
import org.springframework.beans.factory.BeanFactory;
2829
import org.springframework.context.support.ClassPathXmlApplicationContext;
30+
import org.springframework.integration.MessageDispatchingException;
2931
import org.springframework.integration.channel.PublishSubscribeChannel;
3032
import org.springframework.integration.channel.QueueChannel;
3133
import org.springframework.integration.context.IntegrationContextUtils;
@@ -43,8 +45,10 @@
4345
import org.springframework.messaging.support.ErrorMessage;
4446
import org.springframework.messaging.support.GenericMessage;
4547
import org.springframework.scheduling.support.PeriodicTrigger;
48+
import org.springframework.util.ClassUtils;
4649

4750
import static org.assertj.core.api.Assertions.assertThat;
51+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
4852
import static org.mockito.Mockito.mock;
4953

5054
/**
@@ -67,14 +71,21 @@ void tearDown() {
6771
}
6872

6973
@Test
70-
public void endpointRegistrationWithInputChannelReference() {
74+
public void endpointRegistrationWithInputChannelReference() throws ClassNotFoundException {
75+
this.context.registerBean(IntegrationContextUtils.APPLICATION_RUNNING_CONTROLLER_BEAN_NAME,
76+
BeanUtils.instantiateClass(
77+
ClassUtils.forName(
78+
IntegrationContextUtils.BASE_PACKAGE + ".config.ApplicationRunningController", null)));
7179
QueueChannel sourceChannel = new QueueChannel();
80+
sourceChannel.setApplicationContext(this.context);
7281
QueueChannel targetChannel = new QueueChannel();
7382
this.context.registerChannel("sourceChannel", sourceChannel);
7483
this.context.registerChannel("targetChannel", targetChannel);
7584
Message<String> message = MessageBuilder.withPayload("test")
7685
.setReplyChannelName("targetChannel").build();
77-
sourceChannel.send(message);
86+
assertThatExceptionOfType(MessageDispatchingException.class)
87+
.isThrownBy(() -> sourceChannel.send(message))
88+
.withMessageStartingWith("The application context is not ready to dispatch messages.");
7889
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
7990

8091
@Override
@@ -88,8 +99,15 @@ public Object handleRequestMessage(Message<?> message) {
8899
endpoint.setBeanFactory(mock(BeanFactory.class));
89100
this.context.registerEndpoint("testEndpoint", endpoint);
90101
this.context.refresh();
102+
sourceChannel.send(message);
91103
Message<?> result = targetChannel.receive(10000);
92104
assertThat(result.getPayload()).isEqualTo("test");
105+
106+
this.context.stop();
107+
108+
assertThatExceptionOfType(MessageDispatchingException.class)
109+
.isThrownBy(() -> sourceChannel.send(message))
110+
.withMessageStartingWith("The application context is not ready to dispatch messages.");
93111
}
94112

95113
@Test

Diff for: spring-integration-event/src/main/java/org/springframework/integration/event/inbound/ApplicationEventListeningMessageProducer.java

+17-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,8 +26,8 @@
2626
import org.springframework.context.event.ContextStoppedEvent;
2727
import org.springframework.context.event.GenericApplicationListener;
2828
import org.springframework.context.support.AbstractApplicationContext;
29-
import org.springframework.core.Ordered;
3029
import org.springframework.core.ResolvableType;
30+
import org.springframework.integration.channel.AbstractMessageChannel;
3131
import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
3232
import org.springframework.messaging.Message;
3333
import org.springframework.util.Assert;
@@ -49,10 +49,10 @@
4949
public class ApplicationEventListeningMessageProducer extends ExpressionMessageProducerSupport
5050
implements GenericApplicationListener {
5151

52-
private volatile Set<ResolvableType> eventTypes;
53-
5452
private ApplicationEventMulticaster applicationEventMulticaster;
5553

54+
private volatile Set<ResolvableType> eventTypes;
55+
5656
private volatile long stoppedAt;
5757

5858
/**
@@ -106,12 +106,17 @@ protected void onInit() {
106106

107107
@Override
108108
public void onApplicationEvent(ApplicationEvent event) {
109-
if (isActive() || ((event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent)
110-
&& stoppedRecently())) {
109+
boolean contextFinished = event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent;
110+
if (isActive() || (contextFinished && stoppedRecently())) {
111+
112+
if (contextFinished && getRequiredOutputChannel() instanceof AbstractMessageChannel) {
113+
logger.warn("Messages for 'ContextStoppedEvent' or 'ContextClosedEvent' cannot be dispatched " +
114+
"via 'AbstractMessageChannel' beans: the application context is in the finished state." +
115+
"Consider to use custom 'MessageChannel' implementation without dispatching logic.");
116+
}
111117

112-
Object source = event.getSource();
113-
if (source instanceof Message<?>) {
114-
sendMessage((Message<?>) source);
118+
if (event.getSource() instanceof Message<?> message) {
119+
sendMessage(message);
115120
}
116121
else {
117122
Message<?> message;
@@ -128,8 +133,8 @@ && stoppedRecently())) {
128133
}
129134

130135
private Object extractObjectToSend(Object root) {
131-
if (root instanceof PayloadApplicationEvent) {
132-
return ((PayloadApplicationEvent<?>) root).getPayload();
136+
if (root instanceof PayloadApplicationEvent<?> payloadApplicationEvent) {
137+
return payloadApplicationEvent.getPayload();
133138
}
134139
return evaluatePayloadExpression(root);
135140
}
@@ -167,14 +172,9 @@ public boolean supportsEventType(ResolvableType eventType) {
167172
return false;
168173
}
169174

170-
@Override
171-
public boolean supportsSourceType(Class<?> sourceType) {
172-
return true;
173-
}
174-
175175
@Override
176176
public int getOrder() {
177-
return Ordered.LOWEST_PRECEDENCE;
177+
return HIGHEST_PRECEDENCE;
178178
}
179179

180180
@Override

0 commit comments

Comments
 (0)