Skip to content

Fix vertx worker propagation and error handling #8237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public String[] knownMatchingTypes() {
"java.util.concurrent.AbstractExecutorService",
"org.glassfish.grizzly.threadpool.GrizzlyExecutorService",
"org.jboss.threads.EnhancedQueueExecutor",
"io.vertx.core.impl.WorkerExecutor",
};
}

Expand Down
10 changes: 10 additions & 0 deletions dd-java-agent/instrumentation/vertx-web-4.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ ext {
// unbound it for latest
latestDepTestMinJavaVersionForTests = JavaVersion.VERSION_11
latestDepForkedTestMinJavaVersionForTests = JavaVersion.VERSION_11
latest4xTestMaxJavaVersionForTests = JavaVersion.VERSION_25
latest4xForkedTestMaxJavaVersionForTests = JavaVersion.VERSION_25
latestDepTestMaxJavaVersionForTests = JavaVersion.VERSION_25
latestDepForkedTestMaxJavaVersionForTests = JavaVersion.VERSION_25
}
Expand All @@ -22,6 +24,8 @@ muzzle {

addTestSuiteForDir('latestDepTest', 'latestDepTest')
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'latestDepTest')
addTestSuiteForDir('latest4xTest', 'test')
addTestSuiteExtendingForDir('latest4xForkedTest', 'latest4xTest', 'test')

configurations {
testArtifacts
Expand Down Expand Up @@ -50,6 +54,9 @@ dependencies {
testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core')
testRuntimeOnly project(':dd-java-agent:instrumentation:netty-buffer-4')

latest4xTestImplementation group: 'io.vertx', name: 'vertx-web', version: '4.+'
latest4xTestImplementation group: 'io.vertx', name: 'vertx-web-client', version: '4.+'

latestDepTestImplementation group: 'io.vertx', name: 'vertx-web', version: '+'
latestDepTestImplementation group: 'io.vertx', name: 'vertx-web-client', version: '+'
}
Expand All @@ -60,3 +67,6 @@ dependencies {
[compileLatestDepForkedTestGroovy, compileLatestDepTestGroovy].each {
it.javaLauncher = getJavaLauncherFor(11)
}
[latest4xForkedTest, latest4xTest].each {
it.jvmArgs += '-Dtest.dd.latest4xTest=true'
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,18 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
}
}
}

class VertxHttpServerWorkerForkedTest extends VertxHttpServerForkedTest {
@Override
HttpServer server() {
return new VertxServer(verticle(), routerBasePath(), true)
}

@Override
boolean testBlocking() {
//FIXME: ASM
// on the worker the requests are dispatched through a queue.
// Despite the blocking works, we fails recording that blocking exception
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import datadog.trace.agent.test.base.HttpServer
import io.vertx.core.AbstractVerticle
import io.vertx.core.DeploymentOptions
import io.vertx.core.ThreadingModel
import io.vertx.core.Vertx
import io.vertx.core.internal.VertxInternal
import io.vertx.core.json.JsonObject
Expand All @@ -13,11 +14,13 @@ class VertxServer implements HttpServer {
private VertxInternal server
private String routerBasePath
private port
private boolean useWorker
Class<AbstractVerticle> verticle

VertxServer(Class<AbstractVerticle> verticle, String routerBasePath) {
VertxServer(Class<AbstractVerticle> verticle, String routerBasePath, boolean useWorker = false) {
this.routerBasePath = routerBasePath
this.verticle = verticle
this.useWorker = useWorker
}

@Override
Expand All @@ -32,10 +35,14 @@ class VertxServer implements HttpServer {
future.complete(null)
})

server.deployVerticle(verticle.name,
new DeploymentOptions()
def deployOptions = new DeploymentOptions()
.setConfig(new JsonObject().put(VertxTestServer.CONFIG_HTTP_SERVER_PORT, 0))
.setInstances(1)).await()
.setInstances(1)

if (useWorker) {
deployOptions = deployOptions.setWorkerPoolSize(1).setThreadingModel(ThreadingModel.WORKER)
}
server.deployVerticle(verticle.name, deployOptions).await()

future.get()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static datadog.trace.bootstrap.instrumentation.httpurlconnection.HttpUrlConnectionDecorator.DECORATE;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
Expand All @@ -20,6 +21,7 @@
public class HttpClientRequestBaseInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
static final String[] CONCRETE_TYPES = {
"io.vertx.core.http.impl.HttpClientRequestBase",
"io.vertx.core.http.impl.HttpClientRequestImpl",
"io.vertx.core.http.impl.HttpClientRequestPushPromise"
};
Expand All @@ -37,7 +39,7 @@ public String[] helperClassNames() {
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod()
.and(isPackagePrivate())
.and(isPackagePrivate().or(isPrivate()))
.and(named("reset"))
.and(takesArgument(0, named("java.lang.Throwable"))),
HttpClientRequestBaseInstrumentation.class.getName() + "$ResetAdvice");
Expand All @@ -53,7 +55,7 @@ public static class ResetAdvice {
public static void onExit(
@Advice.Argument(value = 0) Throwable cause,
@Advice.FieldValue("stream") final HttpClientStream stream,
@Advice.Return(readOnly = false) boolean result) {
@Advice.Return boolean result) {
if (result) {
AgentSpan nettySpan =
stream.connection().channel().attr(AttributeKeys.SPAN_ATTRIBUTE_KEY).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import io.vertx.core.http.HttpClientOptions
import io.vertx.core.http.HttpClientResponse
import io.vertx.core.http.HttpMethod
import io.vertx.core.http.RequestOptions
import io.vertx.core.http.impl.NoStackTraceTimeoutException
import spock.lang.AutoCleanup
import spock.lang.Shared

Expand Down Expand Up @@ -97,14 +96,16 @@ class VertxHttpClientForkedTest extends HttpClientTest implements TestingNettyHt
status == 0
assertTraces(1) {
trace(size(1)) {
clientSpan(it, null, method, false, false, url, null, true, ex)
clientSpan(it, null, method, false, false, url, null, true, null, false,
["error.stack": { String },
"error.message": { String s -> s.startsWith("The timeout period of ${timeout}ms has been exceeded")},
"error.type": { String s -> s.endsWith("NoStackTraceTimeoutException")}])
}
}

where:
timeout = 1000
method = "GET"
url = server.address.resolve("/timeout")
ex = new NoStackTraceTimeoutException("The timeout period of ${timeout}ms has been exceeded while executing GET http://localhost:${url.port}/timeout for server localhost:${url.port}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,26 @@ import io.vertx.core.json.JsonObject
import okhttp3.MediaType
import okhttp3.Request
import okhttp3.RequestBody
import spock.lang.Shared

import java.util.concurrent.CompletableFuture

abstract class IastVertxHttpServerTest extends IastSourcesTest<IastVertxServer> {

@Shared
boolean isVertxLatest4x = Boolean.getBoolean('test.dd.latest4xTest')

@Override
HttpServer server() {
return new IastVertxServer()
}

@Override
protected boolean ignoreParameters() {
//FIXME: ASM
return isVertxLatest4x
}

boolean isHttps() {
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ import datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator
import datadog.trace.instrumentation.vertx_4_0.server.VertxDecorator
import io.vertx.core.AbstractVerticle
import io.vertx.core.Vertx
import spock.lang.Shared

class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
@Shared
boolean isVertxLatest4x = Boolean.getBoolean('test.dd.latest4xTest')

@Override
HttpServer server() {
return new VertxServer(verticle(), routerBasePath())
Expand Down Expand Up @@ -64,7 +68,8 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {

@Override
boolean testRequestBody() {
true
//FIXME: not working on 4.x latest
!isVertxLatest4x
}

@Override
Expand All @@ -79,12 +84,20 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {

@Override
boolean testBodyJson() {
true
//FIXME: not working on 4.x latest
!isVertxLatest4x
}

@Override
boolean testBlocking() {
true
//FIXME: not working on 4.x latest
!isVertxLatest4x
}

@Override
boolean testEncodedQuery() {
//FIXME: not working on 4.x latest
!isVertxLatest4x
}

@Override
Expand Down Expand Up @@ -165,3 +178,18 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
}
}
}

class VertxHttpServerWorkerForkedTest extends VertxHttpServerForkedTest {
@Override
HttpServer server() {
return new VertxServer(verticle(), routerBasePath(), true)
}

@Override
boolean testBlocking() {
//FIXME: ASM
// on the worker the requests are dispatched through a queue.
// Despite the blocking works, we fails recording that blocking exception
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ class VertxServer implements HttpServer {
private String routerBasePath
private port
Class<AbstractVerticle> verticle
boolean useWorker

VertxServer(Class<AbstractVerticle> verticle, String routerBasePath) {
VertxServer(Class<AbstractVerticle> verticle, String routerBasePath, boolean useWorker = false) {
this.routerBasePath = routerBasePath
this.verticle = verticle
this.useWorker = useWorker
}

@Override
Expand All @@ -35,6 +37,8 @@ class VertxServer implements HttpServer {
server.deployVerticle(verticle.name,
new DeploymentOptions()
.setConfig(new JsonObject().put(VertxTestServer.CONFIG_HTTP_SERVER_PORT, 0))
.setWorkerPoolSize(1)
.setWorker(useWorker)
.setInstances(1)) { res ->
if (!res.succeeded()) {
throw new RuntimeException("Cannot deploy server Verticle", res.cause())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public class VertxTestServer extends AbstractVerticle {
public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port";
public static final String PORT_DATA_ADDRESS = "PORT_DATA";

int fibonacci(int n) {
if (n <= 1) {
return n;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}
Comment on lines +42 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"can you evaluate the complexity of this code ?" 😂
(ofc totally ok as it's for a test)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL..It's actually from the customer reproducer and it's useful to simulate some processing time


@Override
public void start(final Promise<Void> startPromise) {
final int port = config().getInteger(CONFIG_HTTP_SERVER_PORT);
Expand All @@ -53,8 +60,10 @@ public void start(final Promise<Void> startPromise) {
controller(
ctx,
SUCCESS,
() ->
ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody())));
() -> {
fibonacci(40);
ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody());
}));
router
.route(FORWARDED.getPath())
.handler(
Expand Down
Loading