diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy index b833afe20df..ff98f1c883f 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy @@ -1,3 +1,4 @@ +import com.google.common.util.concurrent.MoreExecutors import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.DDSpanTypes import datadog.trace.bootstrap.instrumentation.api.Tags @@ -11,6 +12,7 @@ import io.grpc.inprocess.InProcessServerBuilder import io.grpc.stub.StreamObserver import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference @@ -62,7 +64,9 @@ class GrpcStreamingTest extends AgentTestRunner { } } } - Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start() + Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter) + .executor(directExecutor ? MoreExecutors.directExecutor() : Executors.newCachedThreadPool()) + .build().start() ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build() GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady() @@ -178,12 +182,17 @@ class GrpcStreamingTest extends AgentTestRunner { server?.shutdownNow()?.awaitTermination() where: - name | clientMessageCount | serverMessageCount - "A" | 1 | 1 - "B" | 2 | 1 - "C" | 1 | 2 - "D" | 2 | 2 - "E" | 3 | 3 + name | clientMessageCount | serverMessageCount | directExecutor + "A" | 1 | 1 | false + "B" | 2 | 1 | false + "C" | 1 | 2 | false + "D" | 2 | 2 | false + "E" | 3 | 3 | false + "A" | 1 | 1 | true + "B" | 2 | 1 | true + "C" | 1 | 2 | true + "D" | 2 | 2 | true + "E" | 3 | 3 | true clientRange = 1..clientMessageCount serverRange = 1..serverMessageCount diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java index db707e9e3e2..89428cd69d9 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java @@ -45,9 +45,6 @@ public AbstractExecutorInstrumentation(final String... additionalNames) { PERMITTED_EXECUTORS_PREFIXES = Collections.emptyList(); } else { final String[] whitelist = { - "com.google.common.util.concurrent.AbstractListeningExecutorService", - "com.google.common.util.concurrent.MoreExecutors$ListeningDecorator", - "com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator", "io.netty.channel.epoll.EpollEventLoop", "io.netty.channel.epoll.EpollEventLoopGroup", "io.netty.channel.MultithreadEventLoopGroup", @@ -64,8 +61,6 @@ public AbstractExecutorInstrumentation(final String... additionalNames) { "io.netty.util.concurrent.SingleThreadEventExecutor", "java.util.concurrent.AbstractExecutorService", "java.util.concurrent.CompletableFuture$ThreadPerTaskExecutor", - "java.util.concurrent.Executors$DelegatedExecutorService", - "java.util.concurrent.Executors$FinalizableDelegatedExecutorService", "java.util.concurrent.ThreadPoolExecutor", "kotlinx.coroutines.scheduling.CoroutineScheduler", "org.eclipse.jetty.util.thread.QueuedThreadPool", diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RejectedExecutionHandlerInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RejectedExecutionHandlerInstrumentation.java index 3cb6ec8141f..865c7036bba 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RejectedExecutionHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RejectedExecutionHandlerInstrumentation.java @@ -1,8 +1,10 @@ package datadog.trace.instrumentation.java.concurrent; import static datadog.trace.agent.tooling.bytebuddy.matcher.DDElementMatchers.hasInterface; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.cancelTask; import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; @@ -34,7 +36,8 @@ public ElementMatcher typeMatcher() { "java.util.concurrent.ThreadPoolExecutor$DiscardPolicy", "java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy", "java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy") - .or(hasInterface(named("java.util.concurrent.RejectedExecutionHandler"))); + .or(hasInterface(named("java.util.concurrent.RejectedExecutionHandler"))) + .or(hasInterface(nameEndsWith("netty.util.concurrent.RejectedExecutionHandler"))); } @Override @@ -50,9 +53,10 @@ public Map contextStore() { public Map, String> transformers() { return Collections.singletonMap( isMethod() - .and(named("rejectedExecution")) - .and(takesArgument(0, named("java.lang.Runnable"))) - .and(takesArgument(1, named("java.util.concurrent.ThreadPoolExecutor"))), + // JDK or netty + .and(namedOneOf("rejectedExecution", "rejected")) + // must not constrain or use second parameter + .and(takesArgument(0, named("java.lang.Runnable"))), getClass().getName() + "$Reject"); } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/CrossedContextTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/CrossedContextTest.groovy new file mode 100644 index 00000000000..eeed2e22bb8 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/CrossedContextTest.groovy @@ -0,0 +1,141 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.core.DDSpan +import io.netty.channel.DefaultEventLoopGroup +import io.netty.channel.ThreadPerChannelEventLoop +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.oio.OioEventLoopGroup +import io.netty.util.concurrent.DefaultEventExecutor +import org.apache.tomcat.util.threads.TaskQueue +import spock.lang.Shared + +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.ThreadFactory +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class CrossedContextTest extends AgentTestRunner { + + @Shared + def executeRunnable = { e, c -> e.execute((Runnable) c) } + @Shared + def submitRunnable = { e, c -> e.submit((Runnable) c) } + + // this pool is not what's being tested + @Shared + ExecutorService pool = Executors.newFixedThreadPool(40, new ThreadFactory() { + AtomicInteger i = new AtomicInteger() + + @Override + Thread newThread(Runnable r) { + Thread t = new Thread(r) + t.setName("parent-" + i.getAndIncrement()) + return t + } + }) + + def cleanupSpec() { + pool.shutdownNow() + } + + def "concurrent #action are traced with correct lineage in #executor"() { + when: + def sut = executor + def fn = function + int taskCount = 200 + for (int i = 0; i < taskCount; ++i) { + pool.execute({ + String threadName = Thread.currentThread().getName() + runUnderTrace(threadName) { + fn(sut, new Descendant(threadName)) + } + }) + } + + TEST_WRITER.waitForTraces(taskCount) + then: + for (List trace : TEST_WRITER) { + assert trace.size() == 2 + DDSpan parent = trace.find({ it.isRootSpan() }) + assert null != parent + DDSpan child = trace.find({ it.getParentId() == parent.getSpanId() }) + assert null != child + assert child.getOperationName().toString().startsWith(parent.getOperationName().toString()) + } + + cleanup: + executor.shutdownNow() + + where: + executor | action | function + new ForkJoinPool() | "submission" | submitRunnable + new ForkJoinPool(10) | "submission" | submitRunnable + Executors.newSingleThreadExecutor() | "submission" | submitRunnable + Executors.newSingleThreadScheduledExecutor() | "submission" | submitRunnable + Executors.newFixedThreadPool(10) | "submission" | submitRunnable + Executors.newScheduledThreadPool(10) | "submission" | submitRunnable + Executors.newCachedThreadPool() | "submission" | submitRunnable + new DefaultEventLoopGroup(10) | "submission" | submitRunnable + new DefaultEventLoopGroup(1).next() | "submission" | submitRunnable + // TODO - flaky - seems to be relying on PendingTrace flush + // new UnorderedThreadPoolEventExecutor(10) | "submission" | submitRunnable + new NioEventLoopGroup(10) | "submission" | submitRunnable + new DefaultEventExecutor() | "submission" | submitRunnable + new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue()) | "submission" | submitRunnable + new ForkJoinPool() | "execution" | executeRunnable + new ForkJoinPool(10) | "execution" | executeRunnable + Executors.newSingleThreadExecutor() | "execution" | executeRunnable + Executors.newSingleThreadScheduledExecutor() | "execution" | executeRunnable + Executors.newFixedThreadPool(10) | "execution" | executeRunnable + Executors.newScheduledThreadPool(10) | "execution" | executeRunnable + Executors.newCachedThreadPool() | "execution" | executeRunnable + new DefaultEventLoopGroup(10) | "execution" | executeRunnable + new DefaultEventLoopGroup(1).next() | "execution" | executeRunnable + // TODO - flaky - seems to be relying on PendingTrace flush + // new UnorderedThreadPoolEventExecutor(10) | "execution" | executeRunnable + new NioEventLoopGroup(10) | "execution" | executeRunnable + new DefaultEventExecutor() | "execution" | executeRunnable + new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue()) | "execution" | executeRunnable + } + + def "netty event loop internal executions in #executor are traced with correct lineage" () { + setup: + ExecutorService pool = executor + when: + + int taskCount = 200 + for (int i = 0; i < taskCount; ++i) { + pool.execute({ + String threadName = Thread.currentThread().getName() + runUnderTrace(threadName) { + pool.execute(new Descendant(threadName)) + } + }) + } + + TEST_WRITER.waitForTraces(taskCount) + then: + for (List trace : TEST_WRITER) { + assert trace.size() == 2 + DDSpan parent = trace.find({ it.isRootSpan() }) + assert null != parent + DDSpan child = trace.find({ it.getParentId() == parent.getSpanId() }) + assert null != child + assert child.getOperationName().toString().startsWith(parent.getOperationName().toString()) + } + + where: + executor << [ + new ThreadPerChannelEventLoop(new OioEventLoopGroup()), + new DefaultEventExecutor(), + new NioEventLoopGroup(10), + new DefaultEventLoopGroup(10) + // flaky + // new UnorderedThreadPoolEventExecutor(10) + ] + } + +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index a38444b1f59..4b3665fe8e1 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -179,6 +179,16 @@ class ExecutorInstrumentationTest extends AgentTestRunner { "invokeAny with timeout" | invokeAnyTimeout | new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue()) // guava + "execute Runnable" | executeRunnable | MoreExecutors.directExecutor() + + "execute Runnable" | executeRunnable | MoreExecutors.newDirectExecutorService() + "submit Runnable" | submitRunnable | MoreExecutors.newDirectExecutorService() + "submit Callable" | submitCallable | MoreExecutors.newDirectExecutorService() + "invokeAll" | invokeAll | MoreExecutors.newDirectExecutorService() + "invokeAll with timeout" | invokeAllTimeout | MoreExecutors.newDirectExecutorService() + "invokeAny" | invokeAny | MoreExecutors.newDirectExecutorService() + "invokeAny with timeout" | invokeAnyTimeout | MoreExecutors.newDirectExecutorService() + "execute Runnable" | executeRunnable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()) "submit Runnable" | submitRunnable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()) "submit Callable" | submitCallable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()) @@ -250,12 +260,6 @@ class ExecutorInstrumentationTest extends AgentTestRunner { activeScope().setAsyncPropagation(true) try { for (int i = 0; i < 20; ++i) { - // Our current instrumentation instrumentation does not behave very well - // if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned' - // child traces sometimes since state can contain only one continuation - and - // we do not really have a good way for attributing work to correct parent span - // if we reuse Callable/Runnable. - // Solution for now is to never reuse a Callable/Runnable. final JavaAsyncChild child = new JavaAsyncChild(false, true) children.add(child) try { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/NettyExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/NettyExecutorInstrumentationTest.groovy index 5a1cc1768bb..1c7939b9ff4 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/NettyExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/NettyExecutorInstrumentationTest.groovy @@ -83,6 +83,26 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner { where: name | method | poolImpl +// TODO flaky +// "execute Runnable" | executeRunnable | new UnorderedThreadPoolEventExecutor(1) +// "submit Runnable" | submitRunnable | new UnorderedThreadPoolEventExecutor(1) +// "submit Callable" | submitCallable | new UnorderedThreadPoolEventExecutor(1) +// "invokeAll" | invokeAll | new UnorderedThreadPoolEventExecutor(1) +// "invokeAll with timeout" | invokeAllTimeout | new UnorderedThreadPoolEventExecutor(1) +// "invokeAny" | invokeAny | new UnorderedThreadPoolEventExecutor(1) +// "invokeAny with timeout" | invokeAnyTimeout | new UnorderedThreadPoolEventExecutor(1) +// "schedule Runnable" | scheduleRunnable | new UnorderedThreadPoolEventExecutor(1) +// "schedule Callable" | scheduleCallable | new UnorderedThreadPoolEventExecutor(1) + + "execute Runnable" | executeRunnable | defaultEventExecutorGroup + "submit Runnable" | submitRunnable | defaultEventExecutorGroup + "submit Callable" | submitCallable | defaultEventExecutorGroup + "invokeAll" | invokeAll | defaultEventExecutorGroup + "invokeAll with timeout" | invokeAllTimeout | defaultEventExecutorGroup + "invokeAny" | invokeAny | defaultEventExecutorGroup + "invokeAny with timeout" | invokeAnyTimeout | defaultEventExecutorGroup + "schedule Runnable" | scheduleRunnable | defaultEventExecutorGroup + "schedule Callable" | scheduleCallable | defaultEventExecutorGroup "execute Runnable" | executeRunnable | defaultEventExecutorGroup.next() "submit Runnable" | submitRunnable | defaultEventExecutorGroup.next() @@ -104,6 +124,16 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner { "schedule Runnable" | scheduleRunnable | defaultEventLoopGroup.next() "schedule Callable" | scheduleCallable | defaultEventLoopGroup.next() + "execute Runnable" | executeRunnable | defaultEventLoopGroup + "submit Runnable" | submitRunnable | defaultEventLoopGroup + "submit Callable" | submitCallable | defaultEventLoopGroup + "invokeAll" | invokeAll | defaultEventLoopGroup + "invokeAll with timeout" | invokeAllTimeout | defaultEventLoopGroup + "invokeAny" | invokeAny | defaultEventLoopGroup + "invokeAny with timeout" | invokeAnyTimeout | defaultEventLoopGroup + "schedule Runnable" | scheduleRunnable | defaultEventLoopGroup + "schedule Callable" | scheduleCallable | defaultEventLoopGroup + "execute Runnable" | executeRunnable | nioEventLoopGroup.next() "submit Runnable" | submitRunnable | nioEventLoopGroup.next() "submit Callable" | submitCallable | nioEventLoopGroup.next() @@ -114,6 +144,16 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner { "schedule Runnable" | scheduleRunnable | nioEventLoopGroup.next() "schedule Callable" | scheduleCallable | nioEventLoopGroup.next() + "execute Runnable" | executeRunnable | nioEventLoopGroup + "submit Runnable" | submitRunnable | nioEventLoopGroup + "submit Callable" | submitCallable | nioEventLoopGroup + "invokeAll" | invokeAll | nioEventLoopGroup + "invokeAll with timeout" | invokeAllTimeout | nioEventLoopGroup + "invokeAny" | invokeAny | nioEventLoopGroup + "invokeAny with timeout" | invokeAnyTimeout | nioEventLoopGroup + "schedule Runnable" | scheduleRunnable | nioEventLoopGroup + "schedule Callable" | scheduleCallable | nioEventLoopGroup + "execute Runnable" | executeRunnable | epollExecutor() "submit Runnable" | submitRunnable | epollExecutor() "submit Callable" | submitCallable | epollExecutor() @@ -124,6 +164,16 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner { "schedule Runnable" | scheduleRunnable | epollExecutor() "schedule Callable" | scheduleCallable | epollExecutor() + "execute Runnable" | executeRunnable | epollEventLoopGroup + "submit Runnable" | submitRunnable | epollEventLoopGroup + "submit Callable" | submitCallable | epollEventLoopGroup + "invokeAll" | invokeAll | epollEventLoopGroup + "invokeAll with timeout" | invokeAllTimeout | epollEventLoopGroup + "invokeAny" | invokeAny | epollEventLoopGroup + "invokeAny with timeout" | invokeAnyTimeout | epollEventLoopGroup + "schedule Runnable" | scheduleRunnable | epollEventLoopGroup + "schedule Callable" | scheduleCallable | epollEventLoopGroup + // ignore deprecation "execute Runnable" | executeRunnable | localEventLoopGroup.next() "submit Runnable" | submitRunnable | localEventLoopGroup.next() @@ -135,6 +185,16 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner { "schedule Runnable" | scheduleRunnable | localEventLoopGroup.next() "schedule Callable" | scheduleCallable | localEventLoopGroup.next() + "execute Runnable" | executeRunnable | localEventLoopGroup + "submit Runnable" | submitRunnable | localEventLoopGroup + "submit Callable" | submitCallable | localEventLoopGroup + "invokeAll" | invokeAll | localEventLoopGroup + "invokeAll with timeout" | invokeAllTimeout | localEventLoopGroup + "invokeAny" | invokeAny | localEventLoopGroup + "invokeAny with timeout" | invokeAnyTimeout | localEventLoopGroup + "schedule Runnable" | scheduleRunnable | localEventLoopGroup + "schedule Callable" | scheduleCallable | localEventLoopGroup + } def "#poolImpl '#name' reports after canceled jobs"() { @@ -181,6 +241,11 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner { where: name | method | poolImpl +// TODO flaky +// "submit Runnable" | submitRunnable | new UnorderedThreadPoolEventExecutor(1) +// "submit Callable" | submitCallable | new UnorderedThreadPoolEventExecutor(1) +// "schedule Runnable" | scheduleRunnable | new UnorderedThreadPoolEventExecutor(1) +// "schedule Callable" | scheduleCallable | new UnorderedThreadPoolEventExecutor(1) "submit Runnable" | submitRunnable | defaultEventExecutorGroup.next() "submit Callable" | submitCallable | defaultEventExecutorGroup.next() diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RecursiveThreadPoolPropagationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RecursiveThreadPoolPropagationTest.groovy index 133df6ece5c..4354ac5d438 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RecursiveThreadPoolPropagationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RecursiveThreadPoolPropagationTest.groovy @@ -275,6 +275,64 @@ class RecursiveThreadPoolPropagationTest extends AgentTestRunner { 5 | 2 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism).next() 5 | 3 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism).next() 5 | 4 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism).next() + + 1 | 2 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 1 | 3 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 1 | 4 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 2 | 1 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 2 | 2 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 2 | 3 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 2 | 4 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 3 | 1 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 3 | 2 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 3 | 3 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 3 | 4 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 4 | 1 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 4 | 2 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 4 | 3 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 4 | 4 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 5 | 1 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 5 | 2 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 5 | 3 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 5 | 4 | recursiveSubmission | new DefaultEventLoopGroup(parallelism) + 1 | 2 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 1 | 3 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 1 | 4 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 2 | 1 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 2 | 2 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 2 | 3 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 2 | 4 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 3 | 1 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 3 | 2 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 3 | 3 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 3 | 4 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 4 | 1 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 4 | 2 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 4 | 3 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 4 | 4 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 5 | 1 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 5 | 2 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 5 | 3 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 5 | 4 | recursiveExecution | new DefaultEventLoopGroup(parallelism) + 1 | 2 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 1 | 3 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 1 | 4 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 2 | 1 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 2 | 2 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 2 | 3 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 2 | 4 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 3 | 1 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 3 | 2 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 3 | 3 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 3 | 4 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 4 | 1 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 4 | 2 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 4 | 3 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 4 | 4 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 5 | 1 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 5 | 2 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 5 | 3 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) + 5 | 4 | mixedSubmissionAndExecution | new DefaultEventLoopGroup(parallelism) } private static void assertTrace(int depth) { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RejectedExecutionTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RejectedExecutionTest.groovy index 79642840a72..55c891acc96 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RejectedExecutionTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RejectedExecutionTest.groovy @@ -1,6 +1,8 @@ import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.DDId import datadog.trace.core.DDSpan +import io.netty.util.concurrent.DefaultEventExecutor +import io.netty.util.concurrent.DefaultThreadFactory import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.CountDownLatch @@ -134,20 +136,7 @@ class RejectedExecutionTest extends AgentTestRunner { testClosure() then: - TEST_WRITER.waitForTraces(1) - TEST_WRITER.size() == 1 - List trace = TEST_WRITER.get(0) - trace.size() == 2 - DDSpan parent = trace.find { - it.getOperationName() == "parent" - } - DDSpan child = trace.find { - it.getOperationName() == "asyncChild" - } - parent != null - child != null - parent.getParentId() == DDId.ZERO - parent.getSpanId() == child.getParentId() + expectParentChildTrace() where: @@ -164,6 +153,54 @@ class RejectedExecutionTest extends AgentTestRunner { ] } + def "trace reported with swallowing netty rejected execution handler" () { + setup: + DefaultEventExecutor executor = new DefaultEventExecutor(null, new DefaultThreadFactory(DefaultEventExecutor), + 1, handler) + CountDownLatch latch = new CountDownLatch(1) + // this task will block the executor thread ensuring the next task gets enqueued + executor.submit({ + latch.await() + }) + // this will sit in the queue + executor.submit({}) + + when: + runUnderTrace("parent") { + activeScope().setAsyncPropagation(true) + // must be rejected because the queue will be full until some + // time after the first task is released + executor.submit((Runnable) new JavaAsyncChild(true, false)) + latch.countDown() + } + + then: + expectParentChildTrace() + + where: + handler << [ + new SwallowingRejectedExecutionHandler() + ] + } + + def expectParentChildTrace() { + TEST_WRITER.waitForTraces(1) + TEST_WRITER.size() == 1 + List trace = TEST_WRITER.get(0) + assert trace.size() == 2 + DDSpan parent = trace.find { + it.getOperationName() == "parent" + } + DDSpan child = trace.find { + it.getOperationName() == "asyncChild" + } + assert parent != null + assert child != null + assert parent.getParentId() == DDId.ZERO + assert parent.getSpanId() == child.getParentId() + return true + } + def setupShutdownExecutor(RejectedExecutionHandler rejectedExecutionHandler) { ExecutorService pool = new ThreadPoolExecutor(1, 1, diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/java/Descendant.java b/dd-java-agent/instrumentation/java-concurrent/src/test/java/Descendant.java new file mode 100644 index 00000000000..24ad0994bda --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/java/Descendant.java @@ -0,0 +1,24 @@ +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.context.TraceScope; + +public final class Descendant implements Runnable { + + private final String parent; + + public Descendant(String parent) { + this.parent = parent; + } + + @Override + public void run() { + AgentSpan span = startSpan(parent + "-child"); + try (TraceScope scope = activateSpan(span)) { + + } finally { + span.finish(); + } + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/java/SwallowingRejectedExecutionHandler.java b/dd-java-agent/instrumentation/java-concurrent/src/test/java/SwallowingRejectedExecutionHandler.java index 20c3f694fb1..dd65c230046 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/java/SwallowingRejectedExecutionHandler.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/java/SwallowingRejectedExecutionHandler.java @@ -1,7 +1,12 @@ +import io.netty.util.concurrent.SingleThreadEventExecutor; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; -public class SwallowingRejectedExecutionHandler implements RejectedExecutionHandler { +public class SwallowingRejectedExecutionHandler + implements RejectedExecutionHandler, io.netty.util.concurrent.RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {} + + @Override + public void rejected(Runnable task, SingleThreadEventExecutor executor) {} }