Skip to content

Commit 09cb844

Browse files
committed
Instrument Scheduled methods for observability
This commit enhances the `ScheduledAnnotationBeanPostProcessor` to instrument `@Scheduled` methods declared on beans. This will create `"tasks.scheduled.execution"` observations for each execution of a scheduled method. This supports both blocking and reactive variants. By default, observations are no-ops; developers must configure the current `ObservationRegistry` on the `ScheduledTaskRegistrar` by using a `SchedulingConfigurer`. Closes gh-29883
1 parent 842569c commit 09cb844

14 files changed

+899
-23
lines changed

Diff for: framework-docs/modules/ROOT/pages/integration/observability.adoc

+30-2
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ As outlined xref:integration/observability.adoc[at the beginning of this section
2121
|===
2222
|Observation name |Description
2323

24-
|xref:integration/observability.adoc#http-client[`"http.client.requests"`]
24+
|xref:integration/observability.adoc#observability.http-client[`"http.client.requests"`]
2525
|Time spent for HTTP client exchanges
2626

27-
|xref:integration/observability.adoc#http-server[`"http.server.requests"`]
27+
|xref:integration/observability.adoc#observability.http-server[`"http.server.requests"`]
2828
|Processing time for HTTP server exchanges at the Framework level
29+
30+
|xref:integration/observability.adoc#observability.tasks-scheduled[`"tasks.scheduled.execution"`]
31+
|Processing time for an execution of a `@Scheduled` task
2932
|===
3033

3134
NOTE: Observations are using Micrometer's official naming convention, but Metrics names will be automatically converted
@@ -79,6 +82,31 @@ include-code::./ServerRequestObservationFilter[]
7982

8083
You can configure `ObservationFilter` instances on the `ObservationRegistry`.
8184

85+
[[observability.tasks-scheduled]]
86+
== @Scheduled tasks instrumentation
87+
88+
An Observation is created for xref:integration/scheduling.adoc#scheduling-enable-annotation-support[each execution of an `@Scheduled` task].
89+
Applications need to configure the `ObservationRegistry` on the `ScheduledTaskRegistrar` to enable the recording of observations.
90+
This can be done by declaring a `SchedulingConfigurer` bean that sets the observation registry:
91+
92+
include-code::./ObservationSchedulingConfigurer[]
93+
94+
It is using the `org.springframework.scheduling.config.DefaultScheduledTaskObservationConvention` by default, backed by the `ScheduledTaskObservationContext`.
95+
You can configure a custom implementation on the `ObservationRegistry` directly.
96+
During the execution of the scheduled method, the current observation is restored in the `ThreadLocal` context or the Reactor context (if the scheduled method returns a `Mono` or `Flux` type).
97+
98+
By default, the following `KeyValues` are created:
99+
100+
.Low cardinality Keys
101+
[cols="a,a"]
102+
|===
103+
|Name | Description
104+
|`exception` _(required)_|Name of the exception thrown during the execution, or `KeyValue#NONE_VALUE`} if no exception happened.
105+
|`method.name` _(required)_|Name of Java `Method` that is scheduled for execution.
106+
|`outcome` _(required)_|Outcome of the method execution. Can be `"SUCCESS"`, `"ERROR"` or `"UNKNOWN"` (if for example the operation was cancelled during execution.
107+
|`target.type` _(required)_|Simple class name of the bean instance that holds the scheduled method.
108+
|===
109+
82110

83111
[[observability.http-server]]
84112
== HTTP Server instrumentation
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.docs.integration.observability.tasksscheduled;
18+
19+
20+
import io.micrometer.observation.ObservationRegistry;
21+
22+
import org.springframework.scheduling.annotation.SchedulingConfigurer;
23+
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
24+
25+
public class ObservationSchedulingConfigurer implements SchedulingConfigurer {
26+
27+
private final ObservationRegistry observationRegistry;
28+
29+
public ObservationSchedulingConfigurer(ObservationRegistry observationRegistry) {
30+
this.observationRegistry = observationRegistry;
31+
}
32+
33+
@Override
34+
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
35+
taskRegistrar.setObservationRegistry(this.observationRegistry);
36+
}
37+
38+
}

Diff for: spring-context/spring-context.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ dependencies {
1111
api(project(":spring-beans"))
1212
api(project(":spring-core"))
1313
api(project(":spring-expression"))
14+
api("io.micrometer:micrometer-observation")
1415
optional(project(":spring-instrument"))
1516
optional("jakarta.annotation:jakarta.annotation-api")
1617
optional("jakarta.ejb:jakarta.ejb-api")
@@ -41,6 +42,8 @@ dependencies {
4142
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
4243
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
4344
testImplementation("io.reactivex.rxjava3:rxjava")
45+
testImplementation('io.micrometer:context-propagation')
46+
testImplementation("io.micrometer:micrometer-observation-test")
4447
testRuntimeOnly("jakarta.xml.bind:jakarta.xml.bind-api")
4548
testRuntimeOnly("org.glassfish:jakarta.el")
4649
// Substitute for javax.management:jmxremote_optional:1.0.1_04 (not available on Maven Central)

Diff for: spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,10 @@ protected void processScheduled(Scheduled scheduled, Method method, Object bean)
414414
* accordingly. The Runnable can represent either a synchronous method invocation
415415
* (see {@link #processScheduledSync(Scheduled, Method, Object)}) or an asynchronous
416416
* one (see {@link #processScheduledAsync(Scheduled, Method, Object)}).
417+
* @param scheduled the {@code @Scheduled} annotation
418+
* @param runnable the runnable to be scheduled
419+
* @param method the method that the annotation has been declared on
420+
* @param bean the target bean instance
417421
*/
418422
protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) {
419423
try {
@@ -578,6 +582,7 @@ protected void processScheduledAsync(Scheduled scheduled, Method method, Object
578582
Runnable task;
579583
try {
580584
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled,
585+
this.registrar::getObservationRegistry,
581586
this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>()));
582587
}
583588
catch (IllegalArgumentException ex) {
@@ -598,7 +603,7 @@ protected void processScheduledAsync(Scheduled scheduled, Method method, Object
598603
protected Runnable createRunnable(Object target, Method method) {
599604
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
600605
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
601-
return new ScheduledMethodRunnable(target, invocableMethod);
606+
return new ScheduledMethodRunnable(target, invocableMethod, this.registrar::getObservationRegistry);
602607
}
603608

604609
private static Duration toDuration(long value, TimeUnit timeUnit) {

Diff for: spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java

+54-11
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
import java.lang.reflect.Method;
2121
import java.util.List;
2222
import java.util.concurrent.CountDownLatch;
23+
import java.util.function.Supplier;
2324

25+
import io.micrometer.observation.Observation;
26+
import io.micrometer.observation.ObservationRegistry;
27+
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
2428
import org.apache.commons.logging.Log;
2529
import org.apache.commons.logging.LogFactory;
2630
import org.reactivestreams.Publisher;
@@ -34,16 +38,22 @@
3438
import org.springframework.core.ReactiveAdapter;
3539
import org.springframework.core.ReactiveAdapterRegistry;
3640
import org.springframework.lang.Nullable;
41+
import org.springframework.scheduling.config.DefaultScheduledTaskObservationConvention;
42+
import org.springframework.scheduling.config.ScheduledTaskObservationContext;
43+
import org.springframework.scheduling.config.ScheduledTaskObservationConvention;
3744
import org.springframework.util.Assert;
3845
import org.springframework.util.ClassUtils;
3946
import org.springframework.util.ReflectionUtils;
4047
import org.springframework.util.StringUtils;
4148

49+
import static org.springframework.scheduling.config.ScheduledTaskObservationDocumentation.TASKS_SCHEDULED_EXECUTION;
50+
4251
/**
4352
* Helper class for @{@link ScheduledAnnotationBeanPostProcessor} to support reactive
4453
* cases without a dependency on optional classes.
4554
*
4655
* @author Simon Baslé
56+
* @author Brian Clozel
4757
* @since 6.1
4858
*/
4959
abstract class ScheduledAnnotationReactiveSupport {
@@ -157,11 +167,12 @@ static Publisher<?> getPublisherFor(Method method, Object bean) {
157167
* delay is applied until the next iteration).
158168
*/
159169
static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled,
160-
List<Runnable> subscriptionTrackerRegistry) {
170+
Supplier<ObservationRegistry> observationRegistrySupplier, List<Runnable> subscriptionTrackerRegistry) {
161171

162172
boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString()));
163173
Publisher<?> publisher = getPublisherFor(method, targetBean);
164-
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry);
174+
Supplier<ScheduledTaskObservationContext> contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method);
175+
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier);
165176
}
166177

167178

@@ -173,23 +184,33 @@ static final class SubscribingRunnable implements Runnable {
173184

174185
private final Publisher<?> publisher;
175186

187+
private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION = new DefaultScheduledTaskObservationConvention();
188+
176189
final boolean shouldBlock;
177190

178191
private final List<Runnable> subscriptionTrackerRegistry;
179192

180-
SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry) {
193+
final Supplier<ObservationRegistry> observationRegistrySupplier;
194+
195+
final Supplier<ScheduledTaskObservationContext> contextSupplier;
196+
197+
SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry,
198+
Supplier<ObservationRegistry> observationRegistrySupplier, Supplier<ScheduledTaskObservationContext> contextSupplier) {
181199
this.publisher = publisher;
182200
this.shouldBlock = shouldBlock;
183201
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
202+
this.observationRegistrySupplier = observationRegistrySupplier;
203+
this.contextSupplier = contextSupplier;
184204
}
185205

186206
@Override
187207
public void run() {
208+
Observation observation = TASKS_SCHEDULED_EXECUTION.observation(null, DEFAULT_CONVENTION,
209+
this.contextSupplier, this.observationRegistrySupplier.get());
188210
if (this.shouldBlock) {
189211
CountDownLatch latch = new CountDownLatch(1);
190-
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, latch);
191-
this.subscriptionTrackerRegistry.add(subscriber);
192-
this.publisher.subscribe(subscriber);
212+
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, observation, latch);
213+
subscribe(subscriber, observation);
193214
try {
194215
latch.await();
195216
}
@@ -198,8 +219,19 @@ public void run() {
198219
}
199220
}
200221
else {
201-
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry);
202-
this.subscriptionTrackerRegistry.add(subscriber);
222+
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, observation);
223+
subscribe(subscriber, observation);
224+
}
225+
}
226+
227+
private void subscribe(TrackingSubscriber subscriber, Observation observation) {
228+
this.subscriptionTrackerRegistry.add(subscriber);
229+
if (reactorPresent) {
230+
Flux.from(this.publisher)
231+
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation))
232+
.subscribe(subscriber);
233+
}
234+
else {
203235
this.publisher.subscribe(subscriber);
204236
}
205237
}
@@ -215,6 +247,8 @@ private static final class TrackingSubscriber implements Subscriber<Object>, Run
215247

216248
private final List<Runnable> subscriptionTrackerRegistry;
217249

250+
private final Observation observation;
251+
218252
@Nullable
219253
private final CountDownLatch blockingLatch;
220254

@@ -225,19 +259,21 @@ private static final class TrackingSubscriber implements Subscriber<Object>, Run
225259
@Nullable
226260
private Subscription subscription;
227261

228-
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry) {
229-
this(subscriptionTrackerRegistry, null);
262+
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry, Observation observation) {
263+
this(subscriptionTrackerRegistry, observation, null);
230264
}
231265

232-
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry, @Nullable CountDownLatch latch) {
266+
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry, Observation observation, @Nullable CountDownLatch latch) {
233267
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
268+
this.observation = observation;
234269
this.blockingLatch = latch;
235270
}
236271

237272
@Override
238273
public void run() {
239274
if (this.subscription != null) {
240275
this.subscription.cancel();
276+
this.observation.stop();
241277
}
242278
if (this.blockingLatch != null) {
243279
this.blockingLatch.countDown();
@@ -247,6 +283,7 @@ public void run() {
247283
@Override
248284
public void onSubscribe(Subscription subscription) {
249285
this.subscription = subscription;
286+
this.observation.start();
250287
subscription.request(Integer.MAX_VALUE);
251288
}
252289

@@ -259,6 +296,8 @@ public void onNext(Object obj) {
259296
public void onError(Throwable ex) {
260297
this.subscriptionTrackerRegistry.remove(this);
261298
logger.warn("Unexpected error occurred in scheduled reactive task", ex);
299+
this.observation.error(ex);
300+
this.observation.stop();
262301
if (this.blockingLatch != null) {
263302
this.blockingLatch.countDown();
264303
}
@@ -267,6 +306,10 @@ public void onError(Throwable ex) {
267306
@Override
268307
public void onComplete() {
269308
this.subscriptionTrackerRegistry.remove(this);
309+
if (this.observation.getContext() instanceof ScheduledTaskObservationContext context) {
310+
context.setComplete(true);
311+
}
312+
this.observation.stop();
270313
if (this.blockingLatch != null) {
271314
this.blockingLatch.countDown();
272315
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.scheduling.config;
18+
19+
import io.micrometer.common.KeyValue;
20+
import io.micrometer.common.KeyValues;
21+
22+
import org.springframework.util.StringUtils;
23+
24+
import static org.springframework.scheduling.config.ScheduledTaskObservationDocumentation.LowCardinalityKeyNames;
25+
26+
/**
27+
* Default implementation for {@link ScheduledTaskObservationConvention}.
28+
* @author Brian Clozel
29+
* @since 6.1.0
30+
*/
31+
public class DefaultScheduledTaskObservationConvention implements ScheduledTaskObservationConvention {
32+
33+
private static final String DEFAULT_NAME = "tasks.scheduled.execution";
34+
35+
private static final KeyValue EXCEPTION_NONE = KeyValue.of(LowCardinalityKeyNames.EXCEPTION, KeyValue.NONE_VALUE);
36+
37+
private static final KeyValue OUTCOME_SUCCESS = KeyValue.of(LowCardinalityKeyNames.OUTCOME, "SUCCESS");
38+
39+
private static final KeyValue OUTCOME_ERROR = KeyValue.of(LowCardinalityKeyNames.OUTCOME, "ERROR");
40+
41+
private static final KeyValue OUTCOME_UNKNOWN = KeyValue.of(LowCardinalityKeyNames.OUTCOME, "UNKNOWN");
42+
43+
@Override
44+
public String getName() {
45+
return DEFAULT_NAME;
46+
}
47+
48+
@Override
49+
public String getContextualName(ScheduledTaskObservationContext context) {
50+
return "task " + StringUtils.uncapitalize(context.getTargetClass().getSimpleName())
51+
+ "." + context.getMethod().getName();
52+
}
53+
54+
@Override
55+
public KeyValues getLowCardinalityKeyValues(ScheduledTaskObservationContext context) {
56+
return KeyValues.of(exception(context), methodName(context), outcome(context), targetType(context));
57+
}
58+
59+
protected KeyValue exception(ScheduledTaskObservationContext context) {
60+
if (context.getError() != null) {
61+
return KeyValue.of(LowCardinalityKeyNames.EXCEPTION, context.getError().getClass().getSimpleName());
62+
}
63+
return EXCEPTION_NONE;
64+
}
65+
66+
protected KeyValue methodName(ScheduledTaskObservationContext context) {
67+
return KeyValue.of(LowCardinalityKeyNames.METHOD_NAME, context.getMethod().getName());
68+
}
69+
70+
protected KeyValue outcome(ScheduledTaskObservationContext context) {
71+
if (context.getError() != null) {
72+
return OUTCOME_ERROR;
73+
}
74+
else if (!context.isComplete()) {
75+
return OUTCOME_UNKNOWN;
76+
}
77+
return OUTCOME_SUCCESS;
78+
}
79+
80+
protected KeyValue targetType(ScheduledTaskObservationContext context) {
81+
return KeyValue.of(LowCardinalityKeyNames.TARGET_TYPE, context.getTargetClass().getSimpleName());
82+
}
83+
84+
}

0 commit comments

Comments
 (0)