Skip to content

Commit bde13e8

Browse files
Fix ForkJoinPool.execute() instrumentation on Java 21+ (#8560)
* fix(java-concurrent): Fix FJP instrumentation on Java 21+ * feat(java-concurrent): Add smoke tests for Java 21+ concurrent API
1 parent 8b041eb commit bde13e8

24 files changed

+570
-54
lines changed

dd-java-agent/instrumentation/java-concurrent/java-concurrent-21/src/main/java/datadog/trace/instrumentation/java/concurrent/structuredconcurrency/StructuredTaskScopeInstrumentation.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,16 @@
11
package datadog.trace.instrumentation.java.concurrent.structuredconcurrency;
22

33
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
4-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.FORK_JOIN_TASK;
5-
import static java.util.Collections.singleton;
64
import static java.util.Collections.singletonMap;
75
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
86

97
import com.google.auto.service.AutoService;
10-
import datadog.trace.agent.tooling.ExcludeFilterProvider;
118
import datadog.trace.agent.tooling.Instrumenter;
129
import datadog.trace.agent.tooling.InstrumenterModule;
1310
import datadog.trace.api.Platform;
1411
import datadog.trace.bootstrap.ContextStore;
1512
import datadog.trace.bootstrap.InstrumentationContext;
16-
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
1713
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
18-
import java.util.Collection;
1914
import java.util.Map;
2015
import net.bytebuddy.asm.Advice;
2116

@@ -27,10 +22,7 @@
2722
@SuppressWarnings("unused")
2823
@AutoService(InstrumenterModule.class)
2924
public class StructuredTaskScopeInstrumentation extends InstrumenterModule.Tracing
30-
implements Instrumenter.ForBootstrap,
31-
Instrumenter.ForSingleType,
32-
Instrumenter.HasMethodAdvice,
33-
ExcludeFilterProvider {
25+
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
3426

3527
public StructuredTaskScopeInstrumentation() {
3628
super("java_concurrent", "structured_task_scope");
@@ -57,14 +49,6 @@ public void methodAdvice(MethodTransformer transformer) {
5749
transformer.applyAdvice(isConstructor(), getClass().getName() + "$ConstructorAdvice");
5850
}
5951

60-
@Override
61-
public Map<ExcludeFilter.ExcludeType, ? extends Collection<String>> excludedClasses() {
62-
// Prevent the ForkJoinPool instrumentation to enable the task scope too early on the carrier
63-
// thread rather than on the expected running thread, which is virtual by default.
64-
return singletonMap(
65-
FORK_JOIN_TASK, singleton("java.util.concurrent.ForkJoinTask$RunnableExecuteAction"));
66-
}
67-
6852
public static final class ConstructorAdvice {
6953
@Advice.OnMethodExit
7054
public static <T> void captureScope(
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
plugins {
2+
id 'application'
3+
id 'com.github.johnrengelman.shadow'
4+
}
5+
6+
ext {
7+
minJavaVersionForTests = JavaVersion.VERSION_21
8+
}
9+
10+
apply from: "$rootDir/gradle/java.gradle"
11+
12+
description = 'JDK 21 Concurrent Integration Tests'
13+
14+
java {
15+
toolchain {
16+
languageVersion = JavaLanguageVersion.of(21)
17+
}
18+
}
19+
tasks.withType(JavaCompile).configureEach {
20+
setJavaVersion(it, 21)
21+
sourceCompatibility = JavaVersion.VERSION_21
22+
targetCompatibility = JavaVersion.VERSION_21
23+
}
24+
25+
// Disable plugin tasks that do not support Java 21:
26+
// * forbiddenApis is missing classes
27+
// * spotless as the google-java-format version does not support Java 21 and can't be changed once applied
28+
// * spotbugs failed to read class using newer bytecode versions
29+
forbiddenApisMain {
30+
failOnMissingClasses = false
31+
}
32+
['spotlessApply', 'spotlessCheck', 'spotlessJava', 'spotbugsMain'].each {
33+
tasks.named(it).configure { enabled = false }
34+
}
35+
36+
application {
37+
mainClassName = 'datadog.smoketest.concurrent.ConcurrentApp'
38+
}
39+
40+
dependencies {
41+
implementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '2.13.3'
42+
testImplementation project(':dd-smoke-tests')
43+
}
44+
45+
tasks.withType(Test).configureEach {
46+
dependsOn "shadowJar"
47+
jvmArgs "-Ddatadog.smoketest.shadowJar.path=${tasks.shadowJar.archiveFile.get()}"
48+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package datadog.smoketest.concurrent;
2+
3+
import io.opentelemetry.instrumentation.annotations.WithSpan;
4+
import java.util.concurrent.ExecutionException;
5+
6+
public class ConcurrentApp {
7+
@WithSpan("main")
8+
public static void main(String[] args) {
9+
for (String arg : args) {
10+
try (FibonacciCalculator calc = getCalculator(arg)) {
11+
calc.computeFibonacci(10);
12+
} catch (ExecutionException | InterruptedException e) {
13+
throw new RuntimeException("Failed to compute fibonacci number.", e);
14+
}
15+
}
16+
}
17+
18+
private static FibonacciCalculator getCalculator(String name) {
19+
return switch (name) {
20+
case "virtualThreadExecute" -> new VirtualThreadExecuteCalculator();
21+
case "virtualThreadSubmitRunnable" -> new VirtualThreadSubmitRunnableCalculator();
22+
case "virtualThreadSubmitCallable" -> new VirtualThreadSubmitCallableCalculator();
23+
case "virtualThreadInvokeAll" -> new VirtualThreadInvokeAllCalculator();
24+
case "virtualThreadInvokeAny" -> new VirtualThreadInvokeAnyCalculator();
25+
default -> throw new RuntimeException("Unknown Fibonacci calculator: " + name);
26+
};
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package datadog.smoketest.concurrent;
2+
3+
import io.opentelemetry.instrumentation.annotations.WithSpan;
4+
import java.util.concurrent.CompletableFuture;
5+
import java.util.concurrent.ExecutionException;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
9+
public class VirtualThreadExecuteCalculator implements FibonacciCalculator {
10+
private final ExecutorService executor;
11+
12+
public VirtualThreadExecuteCalculator() {
13+
this.executor = Executors.newVirtualThreadPerTaskExecutor();
14+
}
15+
16+
@Override
17+
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
18+
FibonacciExecuteTask task = new FibonacciExecuteTask(n);
19+
this.executor.execute(task);
20+
return task.result.get();
21+
}
22+
23+
public class FibonacciExecuteTask implements Runnable {
24+
private final long n;
25+
private final CompletableFuture<Long> result;
26+
27+
public FibonacciExecuteTask(long n) {
28+
this.n = n;
29+
this.result = new CompletableFuture<>();
30+
}
31+
32+
@WithSpan("compute")
33+
public void run() {
34+
if (this.n <= 1) {
35+
this.result.complete(this.n);
36+
return;
37+
}
38+
FibonacciExecuteTask task1 = new FibonacciExecuteTask(this.n - 1);
39+
FibonacciExecuteTask task2 = new FibonacciExecuteTask(this.n - 2);
40+
executor.execute(task1);
41+
executor.execute(task2);
42+
try {
43+
this.result.complete(task1.result.get() + task2.result.get());
44+
} catch (InterruptedException | ExecutionException e) {
45+
this.result.completeExceptionally(e);
46+
}
47+
}
48+
}
49+
50+
@Override
51+
public void close() {
52+
this.executor.shutdown();
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package datadog.smoketest.concurrent;
2+
3+
import static java.util.Set.of;
4+
5+
import io.opentelemetry.instrumentation.annotations.WithSpan;
6+
import java.util.List;
7+
import java.util.concurrent.Callable;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
import java.util.concurrent.Future;
13+
14+
public class VirtualThreadInvokeAllCalculator implements FibonacciCalculator {
15+
private final ExecutorService executor;
16+
17+
public VirtualThreadInvokeAllCalculator() {
18+
this.executor = Executors.newVirtualThreadPerTaskExecutor();
19+
}
20+
21+
@Override
22+
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
23+
FibonacciSubmitTask task = new FibonacciSubmitTask(n);
24+
return this.executor.invokeAll(of(task)).getFirst().get();
25+
}
26+
27+
public class FibonacciSubmitTask implements Callable<Long> {
28+
private final long n;
29+
30+
public FibonacciSubmitTask(long n) {
31+
this.n = n;
32+
}
33+
34+
@WithSpan("compute")
35+
public Long call() throws ExecutionException, InterruptedException {
36+
if (this.n <= 1) {
37+
return this.n;
38+
}
39+
FibonacciSubmitTask task1 = new FibonacciSubmitTask(this.n - 1);
40+
FibonacciSubmitTask task2 = new FibonacciSubmitTask(this.n - 2);
41+
List<Future<Long>> futures = executor.invokeAll(List.of(task1, task2));
42+
return futures.getFirst().get() + futures.getLast().get();
43+
}
44+
}
45+
46+
@Override
47+
public void close() {
48+
this.executor.shutdown();
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package datadog.smoketest.concurrent;
2+
3+
import static java.util.Set.of;
4+
5+
import io.opentelemetry.instrumentation.annotations.WithSpan;
6+
import java.util.concurrent.Callable;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
11+
public class VirtualThreadInvokeAnyCalculator implements FibonacciCalculator {
12+
private final ExecutorService executor;
13+
14+
public VirtualThreadInvokeAnyCalculator() {
15+
this.executor = Executors.newVirtualThreadPerTaskExecutor();
16+
}
17+
18+
@Override
19+
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
20+
FibonacciSubmitTask task = new FibonacciSubmitTask(n);
21+
return this.executor.invokeAny(of(task));
22+
}
23+
24+
public class FibonacciSubmitTask implements Callable<Long> {
25+
private final long n;
26+
27+
public FibonacciSubmitTask(long n) {
28+
this.n = n;
29+
}
30+
31+
@WithSpan("compute")
32+
public Long call() throws ExecutionException, InterruptedException {
33+
if (this.n <= 1) {
34+
return this.n;
35+
}
36+
FibonacciSubmitTask task1 = new FibonacciSubmitTask(this.n - 1);
37+
FibonacciSubmitTask task2 = new FibonacciSubmitTask(this.n - 2);
38+
return executor.invokeAny(of(task1)) + executor.invokeAny(of(task2));
39+
}
40+
}
41+
42+
@Override
43+
public void close() {
44+
this.executor.shutdown();
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package datadog.smoketest.concurrent;
2+
3+
import io.opentelemetry.instrumentation.annotations.WithSpan;
4+
import java.util.concurrent.Callable;
5+
import java.util.concurrent.ExecutionException;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.Future;
9+
10+
public class VirtualThreadSubmitCallableCalculator implements FibonacciCalculator {
11+
private final ExecutorService executor;
12+
13+
public VirtualThreadSubmitCallableCalculator() {
14+
this.executor = Executors.newVirtualThreadPerTaskExecutor();
15+
}
16+
17+
@Override
18+
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
19+
FibonacciSubmitTask task = new FibonacciSubmitTask(n);
20+
return this.executor.submit(task).get();
21+
}
22+
23+
public class FibonacciSubmitTask implements Callable<Long> {
24+
private final long n;
25+
26+
public FibonacciSubmitTask(long n) {
27+
this.n = n;
28+
}
29+
30+
@WithSpan("compute")
31+
public Long call() throws ExecutionException, InterruptedException {
32+
if (this.n <= 1) {
33+
return this.n;
34+
}
35+
FibonacciSubmitTask task1 = new FibonacciSubmitTask(this.n - 1);
36+
FibonacciSubmitTask task2 = new FibonacciSubmitTask(this.n - 2);
37+
Future<Long> future1 = executor.submit(task1);
38+
Future<Long> future2 = executor.submit(task2);
39+
return future1.get() + future2.get();
40+
}
41+
}
42+
43+
@Override
44+
public void close() {
45+
this.executor.shutdown();
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package datadog.smoketest.concurrent;
2+
3+
import io.opentelemetry.instrumentation.annotations.WithSpan;
4+
import java.util.concurrent.CompletableFuture;
5+
import java.util.concurrent.ExecutionException;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
9+
public class VirtualThreadSubmitRunnableCalculator implements FibonacciCalculator {
10+
private final ExecutorService executor;
11+
12+
public VirtualThreadSubmitRunnableCalculator() {
13+
this.executor = Executors.newVirtualThreadPerTaskExecutor();
14+
}
15+
16+
@Override
17+
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
18+
FibonacciSubmitTask task = new FibonacciSubmitTask(n);
19+
this.executor.execute(task);
20+
return task.result.get();
21+
}
22+
23+
public class FibonacciSubmitTask implements Runnable {
24+
private final long n;
25+
private final CompletableFuture<Long> result;
26+
27+
public FibonacciSubmitTask(long n) {
28+
this.n = n;
29+
this.result = new CompletableFuture<>();
30+
}
31+
32+
@WithSpan("compute")
33+
public void run() {
34+
if (this.n <= 1) {
35+
this.result.complete(this.n);
36+
return;
37+
}
38+
FibonacciSubmitTask task1 = new FibonacciSubmitTask(this.n - 1);
39+
FibonacciSubmitTask task2 = new FibonacciSubmitTask(this.n - 2);
40+
executor.submit(task1);
41+
executor.submit(task2);
42+
try {
43+
this.result.complete(task1.result.get() + task2.result.get());
44+
} catch (InterruptedException | ExecutionException e) {
45+
this.result.completeExceptionally(e);
46+
}
47+
}
48+
}
49+
50+
@Override
51+
public void close() {
52+
this.executor.shutdown();
53+
}
54+
}

0 commit comments

Comments
 (0)