diff --git a/dd-smoke-tests/concurrent/build.gradle b/dd-smoke-tests/concurrent/build.gradle new file mode 100644 index 00000000000..b95668127e6 --- /dev/null +++ b/dd-smoke-tests/concurrent/build.gradle @@ -0,0 +1,32 @@ +plugins { + id 'java' + id 'application' + id 'com.github.johnrengelman.shadow' +} + +apply from: "$rootDir/gradle/java.gradle" + +description = 'Concurrent Integration Tests.' + +application { + mainClassName = 'datadog.smoketest.concurrent.ConcurrentApp' +} + +dependencies { + implementation('io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:2.13.1') + implementation project(':dd-trace-api') + testImplementation project(':dd-smoke-tests') + + testImplementation platform('org.junit:junit-bom:5.10.0') + testImplementation 'org.junit.jupiter:junit-jupiter' +} + +test { + useJUnitPlatform() +} + +tasks.withType(Test).configureEach { + dependsOn "shadowJar" + + jvmArgs "-Ddatadog.smoketest.shadowJar.path=${tasks.shadowJar.archiveFile.get()}" +} diff --git a/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/ConcurrentApp.java b/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/ConcurrentApp.java new file mode 100644 index 00000000000..c17d84a8887 --- /dev/null +++ b/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/ConcurrentApp.java @@ -0,0 +1,27 @@ +package datadog.smoketest.concurrent; + +import io.opentelemetry.instrumentation.annotations.WithSpan; +import java.util.concurrent.ExecutionException; + +public class ConcurrentApp { + @WithSpan("main") + public static void main(String[] args) { + // Calculate fibonacci using concurrent strategies + for (String arg : args) { + try (FibonacciCalculator calc = getCalculator(arg)) { + calc.computeFibonacci(10); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Failed to compute fibonacci number.", e); + } + } + } + + private static FibonacciCalculator getCalculator(String name) { + if (name.equalsIgnoreCase("executorService")) { + return new DemoExecutorService(); + } else if (name.equalsIgnoreCase("forkJoin")) { + return new DemoForkJoin(); + } + throw new IllegalArgumentException("Unknown calculator: " + name); + } +} diff --git a/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/DemoExecutorService.java b/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/DemoExecutorService.java new file mode 100644 index 00000000000..2de2b76ccf3 --- /dev/null +++ b/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/DemoExecutorService.java @@ -0,0 +1,53 @@ +package datadog.smoketest.concurrent; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import io.opentelemetry.instrumentation.annotations.WithSpan; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class DemoExecutorService implements FibonacciCalculator { + private final ExecutorService executorService; + + public DemoExecutorService() { + executorService = Executors.newFixedThreadPool(10); + } + + @WithSpan("compute") + @Override + public long computeFibonacci(int n) throws ExecutionException, InterruptedException { + Future future = executorService.submit(new FibonacciTask(n)); + return future.get(); + } + + private class FibonacciTask implements Callable { + private final int n; + + public FibonacciTask(int n) { + this.n = n; + } + + @Override + public Long call() throws ExecutionException, InterruptedException { + if (n <= 1) { + return (long) n; + } + return computeFibonacci(n - 1) + computeFibonacci(n - 2); + } + } + + @Override + public void close() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + } +} diff --git a/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/DemoForkJoin.java b/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/DemoForkJoin.java new file mode 100644 index 00000000000..dd77f86f324 --- /dev/null +++ b/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/DemoForkJoin.java @@ -0,0 +1,43 @@ +package datadog.smoketest.concurrent; + +import io.opentelemetry.instrumentation.annotations.WithSpan; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveTask; + +public class DemoForkJoin implements FibonacciCalculator { + private final ForkJoinPool forkJoinPool; + + public DemoForkJoin() { + forkJoinPool = new ForkJoinPool(); + } + + @Override + public long computeFibonacci(int n) { + return forkJoinPool.invoke(new FibonacciTask(n)); + } + + private class FibonacciTask extends RecursiveTask { + private final int n; + + public FibonacciTask(int n) { + this.n = n; + } + + @WithSpan("compute") + @Override + protected Long compute() { + if (n <= 1) { + return (long) n; + } + FibonacciTask taskOne = new FibonacciTask(n - 1); + taskOne.fork(); + FibonacciTask taskTwo = new FibonacciTask(n - 2); + return taskTwo.compute() + taskOne.join(); + } + } + + @Override + public void close() { + forkJoinPool.shutdown(); + } +} diff --git a/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/FibonacciCalculator.java b/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/FibonacciCalculator.java new file mode 100644 index 00000000000..73974d7b135 --- /dev/null +++ b/dd-smoke-tests/concurrent/src/main/java/datadog/smoketest/concurrent/FibonacciCalculator.java @@ -0,0 +1,10 @@ +package datadog.smoketest.concurrent; + +import java.util.concurrent.ExecutionException; + +public interface FibonacciCalculator extends AutoCloseable { + long computeFibonacci(int n) throws ExecutionException, InterruptedException; + + @Override + void close(); +} diff --git a/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/AbstractDemoTest.groovy b/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/AbstractDemoTest.groovy new file mode 100644 index 00000000000..1628d0459df --- /dev/null +++ b/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/AbstractDemoTest.groovy @@ -0,0 +1,66 @@ +package datadog.smoketest + +import static java.util.concurrent.TimeUnit.SECONDS +import datadog.trace.test.agent.decoder.DecodedTrace +import java.util.function.Function + +abstract class AbstractDemoTest extends AbstractSmokeTest { + protected static final int TIMEOUT_SECS = 10 + protected abstract List getTestArguments() + + @Override + ProcessBuilder createProcessBuilder() { + def jarPath = System.getProperty("datadog.smoketest.shadowJar.path") + def command = new ArrayList() + command.add(javaPath()) + command.addAll(defaultJavaProperties) + command.add("-Ddd.trace.otel.enabled=true") + command.addAll(["-jar", jarPath]) + command.addAll(getTestArguments()) + + ProcessBuilder processBuilder = new ProcessBuilder(command) + processBuilder.directory(new File(buildDirectory)) + } + + @Override + Closure decodedTracesCallback() { + return {} // force traces decoding + } + + protected static Function checkTrace() { + return { + trace -> + // Check for 'main' span + def mainSpan = trace.spans.find { it.name == 'main' } + if (!mainSpan) { + return false + } + // Check that there are only 'main' and 'compute' spans + def otherSpans = trace.spans.findAll { it.name != 'main' && it.name != 'compute' } + if (!otherSpans.isEmpty()) { + return false + } + // Check that every 'compute' span is in the same trace and is either a child of the 'main' span or another 'compute' span + def computeSpans = trace.spans.findAll { it.name == 'compute' } + if (computeSpans.isEmpty()) { + return false + } + return computeSpans.every { + if (it.traceId != mainSpan.traceId) { + return false + } + if (it.parentId != mainSpan.spanId && trace.spans.find(s -> s.spanId == it.parentId).name != 'compute') { + return false + } + return true + } + } + } + + protected void receivedCorrectTrace() { + waitForTrace(defaultPoll, checkTrace()) + assert traceCount.get() == 1 + assert testedProcess.waitFor(TIMEOUT_SECS, SECONDS) + assert testedProcess.exitValue() == 0 + } +} diff --git a/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoExecutorServiceTest.groovy b/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoExecutorServiceTest.groovy new file mode 100644 index 00000000000..b99891b89bb --- /dev/null +++ b/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoExecutorServiceTest.groovy @@ -0,0 +1,12 @@ +package datadog.smoketest + +class DemoExecutorServiceTest extends AbstractDemoTest { + protected List getTestArguments() { + return ["executorService"] + } + + def 'receive one correct trace when using ExecutorService'() { + expect: + receivedCorrectTrace() + } +} diff --git a/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoForkJoinTest.groovy b/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoForkJoinTest.groovy new file mode 100644 index 00000000000..00994a6ef4f --- /dev/null +++ b/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoForkJoinTest.groovy @@ -0,0 +1,12 @@ +package datadog.smoketest + +class DemoForkJoinTest extends AbstractDemoTest { + protected List getTestArguments() { + return ["forkJoin"] + } + + def 'receive one correct trace when using ForkJoin'() { + expect: + receivedCorrectTrace() + } +} diff --git a/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoMultipleConcurrenciesTest.groovy b/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoMultipleConcurrenciesTest.groovy new file mode 100644 index 00000000000..06ec5dc67b2 --- /dev/null +++ b/dd-smoke-tests/concurrent/src/test/groovy/datadog/smoketest/DemoMultipleConcurrenciesTest.groovy @@ -0,0 +1,12 @@ +package datadog.smoketest + +class DemoMultipleConcurrenciesTest extends AbstractDemoTest { + protected List getTestArguments() { + return ["executorService", "forkJoin"] + } + + def 'receive one correct trace when using multiple concurrency strategies (ExecutorService and ForkJoin)'() { + expect: + receivedCorrectTrace() + } +} diff --git a/dd-smoke-tests/src/main/groovy/datadog/smoketest/AbstractSmokeTest.groovy b/dd-smoke-tests/src/main/groovy/datadog/smoketest/AbstractSmokeTest.groovy index 0d1192696f5..e5d1c70f019 100644 --- a/dd-smoke-tests/src/main/groovy/datadog/smoketest/AbstractSmokeTest.groovy +++ b/dd-smoke-tests/src/main/groovy/datadog/smoketest/AbstractSmokeTest.groovy @@ -50,6 +50,9 @@ abstract class AbstractSmokeTest extends ProcessManager { @Shared protected TestHttpServer.Headers lastTraceRequestHeaders = null + @Shared + protected final PollingConditions defaultPoll = new PollingConditions(timeout: 30, initialDelay: 0, delay: 1, factor: 1) + @Shared @AutoCleanup protected TestHttpServer server = httpServer { @@ -292,8 +295,7 @@ abstract class AbstractSmokeTest extends ProcessManager { } int waitForTraceCount(int count) { - def conditions = new PollingConditions(timeout: 30, initialDelay: 0, delay: 0.5, factor: 1) - return waitForTraceCount(count, conditions) + return waitForTraceCount(count, defaultPoll) } int waitForTraceCount(int count, PollingConditions conditions) { @@ -325,8 +327,7 @@ abstract class AbstractSmokeTest extends ProcessManager { } void waitForTelemetryCount(final int count) { - def conditions = new PollingConditions(timeout: 30, initialDelay: 0, delay: 1, factor: 1) - waitForTelemetryCount(conditions, count) + waitForTelemetryCount(defaultPoll, count) } void waitForTelemetryCount(final PollingConditions poll, final int count) { @@ -336,8 +337,7 @@ abstract class AbstractSmokeTest extends ProcessManager { } void waitForTelemetryFlat(final Function, Boolean> predicate) { - def conditions = new PollingConditions(timeout: 30, initialDelay: 0, delay: 1, factor: 1) - waitForTelemetryFlat(conditions, predicate) + waitForTelemetryFlat(defaultPoll, predicate) } void waitForTelemetryFlat(final PollingConditions poll, final Function, Boolean> predicate) { diff --git a/settings.gradle b/settings.gradle index c79530dd5ba..33318144af5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -96,6 +96,7 @@ include ':dd-smoke-tests:apm-tracing-disabled' include ':dd-smoke-tests:armeria-grpc' include ':dd-smoke-tests:backend-mock' include ':dd-smoke-tests:cli' +include ':dd-smoke-tests:concurrent' include ':dd-smoke-tests:crashtracking' include ':dd-smoke-tests:custom-systemloader' include ':dd-smoke-tests:dynamic-config' @@ -524,4 +525,3 @@ include ':dd-java-agent:benchmark' include ':dd-java-agent:benchmark-integration' include ':dd-java-agent:benchmark-integration:jetty-perftest' include ':dd-java-agent:benchmark-integration:play-perftest' -