Skip to content

Commit eb94433

Browse files
authored
GH-8664: Do not use broken observeWithContext() (#8665)
Fixes #8664 When an `Observation` is turned to `NoopObservation`, the `observeWithContext()` fails with `ClassCastException` since `NoopObservation` serves just plain `Context` not the one we supplied * Fix `MessagingGatewaySupport.sendAndReceiveWithObservation()` same way as it is in version `6.0.x` * Modify `IntegrationObservabilityZipkinTests` to reject some `Observation` via `observationPredicate()` configuration **Cherry-pick to `6.1.x`**
1 parent 076b34d commit eb94433

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -595,14 +595,16 @@ private Message<?> convertToRequestMessage(Object object, boolean shouldConvert)
595595
private Message<?> sendAndReceiveWithObservation(MessageChannel requestChannel, Object object,
596596
Message<?> requestMessage) {
597597

598+
MessageRequestReplyReceiverContext context =
599+
new MessageRequestReplyReceiverContext(requestMessage, getComponentName());
600+
598601
return IntegrationObservation.GATEWAY.observation(this.observationConvention,
599602
DefaultMessageRequestReplyReceiverObservationConvention.INSTANCE,
600-
() -> new MessageRequestReplyReceiverContext(requestMessage, getComponentName()),
601-
this.observationRegistry)
602-
.<MessageRequestReplyReceiverContext, Message<?>>observeWithContext((ctx) -> {
603+
() -> context, this.observationRegistry)
604+
.observe(() -> {
603605
Message<?> replyMessage = doSendAndReceive(requestChannel, object, requestMessage);
604606
if (replyMessage != null) {
605-
ctx.setResponse(replyMessage);
607+
context.setResponse(replyMessage);
606608
}
607609
return replyMessage;
608610
});

spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,12 +26,14 @@
2626
import io.micrometer.tracing.test.SampleTestRunner;
2727
import io.micrometer.tracing.test.simple.SpansAssert;
2828

29+
import org.springframework.beans.factory.annotation.Qualifier;
2930
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3031
import org.springframework.context.annotation.Bean;
3132
import org.springframework.context.annotation.Configuration;
3233
import org.springframework.integration.annotation.EndpointId;
3334
import org.springframework.integration.annotation.Poller;
3435
import org.springframework.integration.annotation.ServiceActivator;
36+
import org.springframework.integration.channel.NullChannel;
3537
import org.springframework.integration.channel.QueueChannel;
3638
import org.springframework.integration.channel.interceptor.ObservationPropagationChannelInterceptor;
3739
import org.springframework.integration.config.EnableIntegration;
@@ -64,20 +66,35 @@ public TracingSetup[] getTracingSetup() {
6466
public SampleTestRunnerConsumer yourCode() {
6567
return (bb, meterRegistry) -> {
6668
ObservationRegistry observationRegistry = getObservationRegistry();
69+
70+
observationRegistry.observationConfig()
71+
.observationPredicate((name, context) ->
72+
!(context instanceof MessageRequestReplyReceiverContext messageRequestReplyReceiverContext)
73+
|| !messageRequestReplyReceiverContext.getGatewayName()
74+
.equals("skippedObservationInboundGateway"));
75+
6776
try (AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext()) {
6877
applicationContext.registerBean(ObservationRegistry.class, () -> observationRegistry);
6978
applicationContext.register(ObservationIntegrationTestConfiguration.class);
7079
applicationContext.refresh();
7180

7281
TestMessagingGatewaySupport messagingGateway =
73-
applicationContext.getBean(TestMessagingGatewaySupport.class);
82+
applicationContext.getBean("testInboundGateway", TestMessagingGatewaySupport.class);
7483

7584
Message<?> receive = messagingGateway.process(new GenericMessage<>("test data"));
7685

7786
assertThat(receive).isNotNull()
7887
.extracting("payload").isEqualTo("test data");
7988
var configuration = applicationContext.getBean(ObservationIntegrationTestConfiguration.class);
8089

90+
messagingGateway =
91+
applicationContext.getBean("skippedObservationInboundGateway",
92+
TestMessagingGatewaySupport.class);
93+
94+
receive = messagingGateway.process(new GenericMessage<>("void data"));
95+
96+
assertThat(receive).isNull();
97+
8198
assertThat(configuration.observedHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue();
8299
}
83100

@@ -112,7 +129,7 @@ public SampleTestRunnerConsumer yourCode() {
112129
@EnableIntegration
113130
@EnableIntegrationManagement(
114131
observationPatterns = {
115-
"${spring.integration.management.observation-patterns:testInboundGateway,queueChannel,observedEndpoint}",
132+
"${spring.integration.management.observation-patterns:testInboundGateway,skippedObservationInboundGateway,queueChannel,observedEndpoint}",
116133
"${spring.integration.management.observation-patterns:}"
117134
})
118135
public static class ObservationIntegrationTestConfiguration {
@@ -126,7 +143,7 @@ public ChannelInterceptor observationPropagationInterceptor(ObservationRegistry
126143
}
127144

128145
@Bean
129-
TestMessagingGatewaySupport testInboundGateway(PollableChannel queueChannel) {
146+
TestMessagingGatewaySupport testInboundGateway(@Qualifier("queueChannel") PollableChannel queueChannel) {
130147
TestMessagingGatewaySupport messagingGatewaySupport = new TestMessagingGatewaySupport();
131148
messagingGatewaySupport.setObservationConvention(
132149
new DefaultMessageRequestReplyReceiverObservationConvention() {
@@ -146,6 +163,15 @@ public PollableChannel queueChannel() {
146163
return new QueueChannel();
147164
}
148165

166+
167+
@Bean
168+
TestMessagingGatewaySupport skippedObservationInboundGateway() {
169+
TestMessagingGatewaySupport messagingGatewaySupport = new TestMessagingGatewaySupport();
170+
messagingGatewaySupport.setRequestChannel(new NullChannel());
171+
messagingGatewaySupport.setReplyTimeout(0);
172+
return messagingGatewaySupport;
173+
}
174+
149175
@Bean
150176
@EndpointId("observedEndpoint")
151177
@ServiceActivator(inputChannel = "queueChannel",

0 commit comments

Comments
 (0)