diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java index a8cfcbc3448..98c4f9e86cf 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java @@ -595,14 +595,16 @@ private Message convertToRequestMessage(Object object, boolean shouldConvert) private Message sendAndReceiveWithObservation(MessageChannel requestChannel, Object object, Message requestMessage) { + MessageRequestReplyReceiverContext context = + new MessageRequestReplyReceiverContext(requestMessage, getComponentName()); + return IntegrationObservation.GATEWAY.observation(this.observationConvention, DefaultMessageRequestReplyReceiverObservationConvention.INSTANCE, - () -> new MessageRequestReplyReceiverContext(requestMessage, getComponentName()), - this.observationRegistry) - .>observeWithContext((ctx) -> { + () -> context, this.observationRegistry) + .observe(() -> { Message replyMessage = doSendAndReceive(requestChannel, object, requestMessage); if (replyMessage != null) { - ctx.setResponse(replyMessage); + context.setResponse(replyMessage); } return replyMessage; }); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java index 9f464ce2d91..e572ebf29d1 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,12 +26,14 @@ import io.micrometer.tracing.test.SampleTestRunner; import io.micrometer.tracing.test.simple.SpansAssert; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.EndpointId; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.NullChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.channel.interceptor.ObservationPropagationChannelInterceptor; import org.springframework.integration.config.EnableIntegration; @@ -64,13 +66,20 @@ public TracingSetup[] getTracingSetup() { public SampleTestRunnerConsumer yourCode() { return (bb, meterRegistry) -> { ObservationRegistry observationRegistry = getObservationRegistry(); + + observationRegistry.observationConfig() + .observationPredicate((name, context) -> + !(context instanceof MessageRequestReplyReceiverContext messageRequestReplyReceiverContext) + || !messageRequestReplyReceiverContext.getGatewayName() + .equals("skippedObservationInboundGateway")); + try (AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext()) { applicationContext.registerBean(ObservationRegistry.class, () -> observationRegistry); applicationContext.register(ObservationIntegrationTestConfiguration.class); applicationContext.refresh(); TestMessagingGatewaySupport messagingGateway = - applicationContext.getBean(TestMessagingGatewaySupport.class); + applicationContext.getBean("testInboundGateway", TestMessagingGatewaySupport.class); Message receive = messagingGateway.process(new GenericMessage<>("test data")); @@ -78,6 +87,14 @@ public SampleTestRunnerConsumer yourCode() { .extracting("payload").isEqualTo("test data"); var configuration = applicationContext.getBean(ObservationIntegrationTestConfiguration.class); + messagingGateway = + applicationContext.getBean("skippedObservationInboundGateway", + TestMessagingGatewaySupport.class); + + receive = messagingGateway.process(new GenericMessage<>("void data")); + + assertThat(receive).isNull(); + assertThat(configuration.observedHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -112,7 +129,7 @@ public SampleTestRunnerConsumer yourCode() { @EnableIntegration @EnableIntegrationManagement( observationPatterns = { - "${spring.integration.management.observation-patterns:testInboundGateway,queueChannel,observedEndpoint}", + "${spring.integration.management.observation-patterns:testInboundGateway,skippedObservationInboundGateway,queueChannel,observedEndpoint}", "${spring.integration.management.observation-patterns:}" }) public static class ObservationIntegrationTestConfiguration { @@ -126,7 +143,7 @@ public ChannelInterceptor observationPropagationInterceptor(ObservationRegistry } @Bean - TestMessagingGatewaySupport testInboundGateway(PollableChannel queueChannel) { + TestMessagingGatewaySupport testInboundGateway(@Qualifier("queueChannel") PollableChannel queueChannel) { TestMessagingGatewaySupport messagingGatewaySupport = new TestMessagingGatewaySupport(); messagingGatewaySupport.setObservationConvention( new DefaultMessageRequestReplyReceiverObservationConvention() { @@ -146,6 +163,15 @@ public PollableChannel queueChannel() { return new QueueChannel(); } + + @Bean + TestMessagingGatewaySupport skippedObservationInboundGateway() { + TestMessagingGatewaySupport messagingGatewaySupport = new TestMessagingGatewaySupport(); + messagingGatewaySupport.setRequestChannel(new NullChannel()); + messagingGatewaySupport.setReplyTimeout(0); + return messagingGatewaySupport; + } + @Bean @EndpointId("observedEndpoint") @ServiceActivator(inputChannel = "queueChannel",