Skip to content

Add smoke tests for java's concurrent API #8438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions dd-smoke-tests/concurrent/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
plugins {
id 'java'
id 'application'
id 'com.github.johnrengelman.shadow'
}

apply from: "$rootDir/gradle/java.gradle"

description = 'Concurrent Integration Tests.'

application {
mainClassName = 'datadog.smoketest.concurrent.ConcurrentApp'
}

dependencies {
implementation('io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:2.13.1')
implementation project(':dd-trace-api')
testImplementation project(':dd-smoke-tests')

testImplementation platform('org.junit:junit-bom:5.10.0')
testImplementation 'org.junit.jupiter:junit-jupiter'
}

test {
useJUnitPlatform()
}

tasks.withType(Test).configureEach {
dependsOn "shadowJar"

jvmArgs "-Ddatadog.smoketest.shadowJar.path=${tasks.shadowJar.archiveFile.get()}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package datadog.smoketest.concurrent;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.ExecutionException;

public class ConcurrentApp {
@WithSpan("main")
public static void main(String[] args) {
// Calculate fibonacci using concurrent strategies
for (String arg : args) {
try (FibonacciCalculator calc = getCalculator(arg)) {
calc.computeFibonacci(10);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to compute fibonacci number.", e);
}
}
}

private static FibonacciCalculator getCalculator(String name) {
if (name.equalsIgnoreCase("executorService")) {
return new DemoExecutorService();
} else if (name.equalsIgnoreCase("forkJoin")) {
return new DemoForkJoin();
}
throw new IllegalArgumentException("Unknown calculator: " + name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package datadog.smoketest.concurrent;

import static java.util.concurrent.TimeUnit.SECONDS;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DemoExecutorService implements FibonacciCalculator {
private final ExecutorService executorService;

public DemoExecutorService() {
executorService = Executors.newFixedThreadPool(10);
}

@WithSpan("compute")
@Override
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
Future<Long> future = executorService.submit(new FibonacciTask(n));
return future.get();
}

private class FibonacciTask implements Callable<Long> {
private final int n;

public FibonacciTask(int n) {
this.n = n;
}

@Override
public Long call() throws ExecutionException, InterruptedException {
if (n <= 1) {
return (long) n;
}
return computeFibonacci(n - 1) + computeFibonacci(n - 2);
}
}

@Override
public void close() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(10, SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package datadog.smoketest.concurrent;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class DemoForkJoin implements FibonacciCalculator {
private final ForkJoinPool forkJoinPool;

public DemoForkJoin() {
forkJoinPool = new ForkJoinPool();
}

@Override
public long computeFibonacci(int n) {
return forkJoinPool.invoke(new FibonacciTask(n));
}

private class FibonacciTask extends RecursiveTask<Long> {
private final int n;

public FibonacciTask(int n) {
this.n = n;
}

@WithSpan("compute")
@Override
protected Long compute() {
if (n <= 1) {
return (long) n;
}
FibonacciTask taskOne = new FibonacciTask(n - 1);
taskOne.fork();
FibonacciTask taskTwo = new FibonacciTask(n - 2);
return taskTwo.compute() + taskOne.join();
}
}

@Override
public void close() {
forkJoinPool.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package datadog.smoketest.concurrent;

import java.util.concurrent.ExecutionException;

public interface FibonacciCalculator extends AutoCloseable {
long computeFibonacci(int n) throws ExecutionException, InterruptedException;

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package datadog.smoketest

import static java.util.concurrent.TimeUnit.SECONDS
import datadog.trace.test.agent.decoder.DecodedTrace
import java.util.function.Function

abstract class AbstractDemoTest extends AbstractSmokeTest {
protected static final int TIMEOUT_SECS = 10
protected abstract List<String> getTestArguments()

@Override
ProcessBuilder createProcessBuilder() {
def jarPath = System.getProperty("datadog.smoketest.shadowJar.path")
def command = new ArrayList<String>()
command.add(javaPath())
command.addAll(defaultJavaProperties)
command.add("-Ddd.trace.otel.enabled=true")
command.addAll(["-jar", jarPath])
command.addAll(getTestArguments())

ProcessBuilder processBuilder = new ProcessBuilder(command)
processBuilder.directory(new File(buildDirectory))
}

@Override
Closure decodedTracesCallback() {
return {} // force traces decoding
}

protected static Function<DecodedTrace, Boolean> checkTrace() {
return {
trace ->
// Check for 'main' span
def mainSpan = trace.spans.find { it.name == 'main' }
if (!mainSpan) {
return false
}
// Check that there are only 'main' and 'compute' spans
def otherSpans = trace.spans.findAll { it.name != 'main' && it.name != 'compute' }
if (!otherSpans.isEmpty()) {
return false
}
// Check that every 'compute' span is in the same trace and is either a child of the 'main' span or another 'compute' span
def computeSpans = trace.spans.findAll { it.name == 'compute' }
if (computeSpans.isEmpty()) {
return false
}
return computeSpans.every {
if (it.traceId != mainSpan.traceId) {
return false
}
if (it.parentId != mainSpan.spanId && trace.spans.find(s -> s.spanId == it.parentId).name != 'compute') {
return false
}
return true
}
}
}

protected void receivedCorrectTrace() {
waitForTrace(defaultPoll, checkTrace())
assert traceCount.get() == 1
assert testedProcess.waitFor(TIMEOUT_SECS, SECONDS)
assert testedProcess.exitValue() == 0
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package datadog.smoketest

class DemoExecutorServiceTest extends AbstractDemoTest {
protected List<String> getTestArguments() {
return ["executorService"]
}

def 'receive one correct trace when using ExecutorService'() {
expect:
receivedCorrectTrace()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package datadog.smoketest

class DemoForkJoinTest extends AbstractDemoTest {
protected List<String> getTestArguments() {
return ["forkJoin"]
}

def 'receive one correct trace when using ForkJoin'() {
expect:
receivedCorrectTrace()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package datadog.smoketest

class DemoMultipleConcurrenciesTest extends AbstractDemoTest {
protected List<String> getTestArguments() {
return ["executorService", "forkJoin"]
}

def 'receive one correct trace when using multiple concurrency strategies (ExecutorService and ForkJoin)'() {
expect:
receivedCorrectTrace()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ abstract class AbstractSmokeTest extends ProcessManager {
@Shared
protected TestHttpServer.Headers lastTraceRequestHeaders = null

@Shared
protected final PollingConditions defaultPoll = new PollingConditions(timeout: 30, initialDelay: 0, delay: 1, factor: 1)

@Shared
@AutoCleanup
protected TestHttpServer server = httpServer {
Expand Down Expand Up @@ -292,8 +295,7 @@ abstract class AbstractSmokeTest extends ProcessManager {
}

int waitForTraceCount(int count) {
def conditions = new PollingConditions(timeout: 30, initialDelay: 0, delay: 0.5, factor: 1)
return waitForTraceCount(count, conditions)
return waitForTraceCount(count, defaultPoll)
}

int waitForTraceCount(int count, PollingConditions conditions) {
Expand Down Expand Up @@ -325,8 +327,7 @@ abstract class AbstractSmokeTest extends ProcessManager {
}

void waitForTelemetryCount(final int count) {
def conditions = new PollingConditions(timeout: 30, initialDelay: 0, delay: 1, factor: 1)
waitForTelemetryCount(conditions, count)
waitForTelemetryCount(defaultPoll, count)
}

void waitForTelemetryCount(final PollingConditions poll, final int count) {
Expand All @@ -336,8 +337,7 @@ abstract class AbstractSmokeTest extends ProcessManager {
}

void waitForTelemetryFlat(final Function<Map<String, Object>, Boolean> predicate) {
def conditions = new PollingConditions(timeout: 30, initialDelay: 0, delay: 1, factor: 1)
waitForTelemetryFlat(conditions, predicate)
waitForTelemetryFlat(defaultPoll, predicate)
}

void waitForTelemetryFlat(final PollingConditions poll, final Function<Map<String, Object>, Boolean> predicate) {
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ include ':dd-smoke-tests:apm-tracing-disabled'
include ':dd-smoke-tests:armeria-grpc'
include ':dd-smoke-tests:backend-mock'
include ':dd-smoke-tests:cli'
include ':dd-smoke-tests:concurrent'
include ':dd-smoke-tests:crashtracking'
include ':dd-smoke-tests:custom-systemloader'
include ':dd-smoke-tests:dynamic-config'
Expand Down Expand Up @@ -524,4 +525,3 @@ include ':dd-java-agent:benchmark'
include ':dd-java-agent:benchmark-integration'
include ':dd-java-agent:benchmark-integration:jetty-perftest'
include ':dd-java-agent:benchmark-integration:play-perftest'