Skip to content

Commit edadc79

Browse files
committed
Fix reactive HTTP server Observation instrumentation
Prior to this commit, regressions were introduced with gh-31417: 1. the observation keyvalues would be inconsistent with the HTTP response 2. the observation scope would not cover all controller handlers, causing traceIds to be missing The first issue is caused by the fact that in case of error signals, the observation was stopped before the response was fully committed, which means further processing could happen and update the response status. This commit delays the stop event until the response is committed in case of errors. The second problem is caused by the change from a `contextWrite` operator to using the `tap` operator with a `SignalListener`. The observation was started in the `doOnSubscription` callback, which is too late in some cases. If the WebFlux controller handler is synchronous non-blocking, the execution of the handler is performed before the subscription happens. This means that for those handlers, the observation was not started, even if the current observation was present in the reactor context. This commit changes the `doOnSubscription` to `doFirst` to ensure that the observation is started at the right time. Fixes gh-31715 Fixes gh-31716
1 parent 3783d31 commit edadc79

File tree

2 files changed

+45
-16
lines changed

2 files changed

+45
-16
lines changed

spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java

+19-14
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,17 @@ public ObservationSignalListener(ServerRequestObservationContext observationCont
121121
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
122122
}
123123

124-
@Override
125-
public void doOnSubscription() throws Throwable {
126-
this.observation.start();
127-
}
128124

129125
@Override
130126
public Context addToContext(Context originalContext) {
131127
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
132128
}
133129

130+
@Override
131+
public void doFirst() throws Throwable {
132+
this.observation.start();
133+
}
134+
134135
@Override
135136
public void doOnCancel() throws Throwable {
136137
if (this.observationRecorded.compareAndSet(false, true)) {
@@ -142,16 +143,7 @@ public void doOnCancel() throws Throwable {
142143
@Override
143144
public void doOnComplete() throws Throwable {
144145
if (this.observationRecorded.compareAndSet(false, true)) {
145-
ServerHttpResponse response = this.observationContext.getResponse();
146-
if (response.isCommitted()) {
147-
this.observation.stop();
148-
}
149-
else {
150-
response.beforeCommit(() -> {
151-
this.observation.stop();
152-
return Mono.empty();
153-
});
154-
}
146+
doOnTerminate(this.observationContext);
155147
}
156148
}
157149

@@ -162,8 +154,21 @@ public void doOnError(Throwable error) throws Throwable {
162154
this.observationContext.setConnectionAborted(true);
163155
}
164156
this.observationContext.setError(error);
157+
doOnTerminate(this.observationContext);
158+
}
159+
}
160+
161+
private void doOnTerminate(ServerRequestObservationContext context) {
162+
ServerHttpResponse response = context.getResponse();
163+
if (response.isCommitted()) {
165164
this.observation.stop();
166165
}
166+
else {
167+
response.beforeCommit(() -> {
168+
this.observation.stop();
169+
return Mono.empty();
170+
});
171+
}
167172
}
168173
}
169174

spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919

2020
import java.util.Optional;
2121

22+
import io.micrometer.observation.Observation;
2223
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
2324
import io.micrometer.observation.tck.TestObservationRegistry;
2425
import io.micrometer.observation.tck.TestObservationRegistryAssert;
@@ -27,6 +28,7 @@
2728
import reactor.core.publisher.Mono;
2829
import reactor.test.StepVerifier;
2930

31+
import org.springframework.http.server.reactive.ServerHttpResponse;
3032
import org.springframework.http.server.reactive.observation.ServerRequestObservationContext;
3133
import org.springframework.web.server.ServerWebExchange;
3234
import org.springframework.web.server.WebFilterChain;
@@ -65,7 +67,10 @@ void filterShouldAddNewObservationToReactorContext() {
6567
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
6668
exchange.getResponse().setRawStatusCode(200);
6769
WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> {
68-
assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent();
70+
Observation observation = contextView.get(ObservationThreadLocalAccessor.KEY);
71+
assertThat(observation).isNotNull();
72+
// check that the observation was started
73+
assertThat(observation.getContext().getLowCardinalityKeyValue("outcome")).isNotNull();
6974
return Mono.empty();
7075
});
7176
this.filter.filter(exchange, filterChain).block();
@@ -99,6 +104,25 @@ void filterShouldRecordObservationWhenCancelled() {
99104
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "UNKNOWN");
100105
}
101106

107+
@Test
108+
void filterShouldStopObservationOnResponseCommit() {
109+
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
110+
WebFilterChain filterChain = createFilterChain(filterExchange -> {
111+
throw new IllegalArgumentException("server error");
112+
});
113+
StepVerifier.create(this.filter.filter(exchange, filterChain).doOnError(throwable -> {
114+
ServerHttpResponse response = exchange.getResponse();
115+
response.setRawStatusCode(500);
116+
response.setComplete().block();
117+
}))
118+
.expectError(IllegalArgumentException.class)
119+
.verify();
120+
Optional<ServerRequestObservationContext> observationContext = ServerHttpObservationFilter.findObservationContext(exchange);
121+
assertThat(observationContext.get().getError()).isInstanceOf(IllegalArgumentException.class);
122+
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SERVER_ERROR");
123+
}
124+
125+
102126
private WebFilterChain createFilterChain(ThrowingConsumer<ServerWebExchange> exchangeConsumer) {
103127
return filterExchange -> {
104128
try {

0 commit comments

Comments
 (0)