Skip to content

more executor tests #2002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import com.google.common.util.concurrent.MoreExecutors
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
Expand All @@ -11,6 +12,7 @@ import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.stub.StreamObserver

import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

Expand Down Expand Up @@ -62,7 +64,9 @@ class GrpcStreamingTest extends AgentTestRunner {
}
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter)
.executor(directExecutor ? MoreExecutors.directExecutor() : Executors.newCachedThreadPool())
.build().start()

ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady()
Expand Down Expand Up @@ -178,12 +182,17 @@ class GrpcStreamingTest extends AgentTestRunner {
server?.shutdownNow()?.awaitTermination()

where:
name | clientMessageCount | serverMessageCount
"A" | 1 | 1
"B" | 2 | 1
"C" | 1 | 2
"D" | 2 | 2
"E" | 3 | 3
name | clientMessageCount | serverMessageCount | directExecutor
"A" | 1 | 1 | false
"B" | 2 | 1 | false
"C" | 1 | 2 | false
"D" | 2 | 2 | false
"E" | 3 | 3 | false
"A" | 1 | 1 | true
"B" | 2 | 1 | true
"C" | 1 | 2 | true
"D" | 2 | 2 | true
"E" | 3 | 3 | true

clientRange = 1..clientMessageCount
serverRange = 1..serverMessageCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ public AbstractExecutorInstrumentation(final String... additionalNames) {
PERMITTED_EXECUTORS_PREFIXES = Collections.emptyList();
} else {
final String[] whitelist = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executors here all just wrap other executors

"com.google.common.util.concurrent.AbstractListeningExecutorService",
"com.google.common.util.concurrent.MoreExecutors$ListeningDecorator",
"com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator",
"io.netty.channel.epoll.EpollEventLoop",
"io.netty.channel.epoll.EpollEventLoopGroup",
"io.netty.channel.MultithreadEventLoopGroup",
Expand All @@ -64,8 +61,6 @@ public AbstractExecutorInstrumentation(final String... additionalNames) {
"io.netty.util.concurrent.SingleThreadEventExecutor",
"java.util.concurrent.AbstractExecutorService",
"java.util.concurrent.CompletableFuture$ThreadPerTaskExecutor",
"java.util.concurrent.Executors$DelegatedExecutorService",
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
"java.util.concurrent.ThreadPoolExecutor",
"kotlinx.coroutines.scheduling.CoroutineScheduler",
"org.eclipse.jetty.util.thread.QueuedThreadPool",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package datadog.trace.instrumentation.java.concurrent;

import static datadog.trace.agent.tooling.bytebuddy.matcher.DDElementMatchers.hasInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.cancelTask;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

Expand Down Expand Up @@ -34,7 +36,8 @@ public ElementMatcher<? super TypeDescription> typeMatcher() {
"java.util.concurrent.ThreadPoolExecutor$DiscardPolicy",
"java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy",
"java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy")
.or(hasInterface(named("java.util.concurrent.RejectedExecutionHandler")));
.or(hasInterface(named("java.util.concurrent.RejectedExecutionHandler")))
.or(hasInterface(nameEndsWith("netty.util.concurrent.RejectedExecutionHandler")));
}

@Override
Expand All @@ -50,9 +53,10 @@ public Map<String, String> contextStore() {
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return Collections.singletonMap(
isMethod()
.and(named("rejectedExecution"))
.and(takesArgument(0, named("java.lang.Runnable")))
.and(takesArgument(1, named("java.util.concurrent.ThreadPoolExecutor"))),
// JDK or netty
.and(namedOneOf("rejectedExecution", "rejected"))
// must not constrain or use second parameter
.and(takesArgument(0, named("java.lang.Runnable"))),
getClass().getName() + "$Reject");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.core.DDSpan
import io.netty.channel.DefaultEventLoopGroup
import io.netty.channel.ThreadPerChannelEventLoop
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.oio.OioEventLoopGroup
import io.netty.util.concurrent.DefaultEventExecutor
import org.apache.tomcat.util.threads.TaskQueue
import spock.lang.Shared

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

class CrossedContextTest extends AgentTestRunner {

@Shared
def executeRunnable = { e, c -> e.execute((Runnable) c) }
@Shared
def submitRunnable = { e, c -> e.submit((Runnable) c) }

// this pool is not what's being tested
@Shared
ExecutorService pool = Executors.newFixedThreadPool(40, new ThreadFactory() {
AtomicInteger i = new AtomicInteger()

@Override
Thread newThread(Runnable r) {
Thread t = new Thread(r)
t.setName("parent-" + i.getAndIncrement())
return t
}
})

def cleanupSpec() {
pool.shutdownNow()
}

def "concurrent #action are traced with correct lineage in #executor"() {
when:
def sut = executor
def fn = function
int taskCount = 200
for (int i = 0; i < taskCount; ++i) {
pool.execute({
String threadName = Thread.currentThread().getName()
runUnderTrace(threadName) {
fn(sut, new Descendant(threadName))
}
})
}

TEST_WRITER.waitForTraces(taskCount)
then:
for (List<DDSpan> trace : TEST_WRITER) {
assert trace.size() == 2
DDSpan parent = trace.find({ it.isRootSpan() })
assert null != parent
DDSpan child = trace.find({ it.getParentId() == parent.getSpanId() })
assert null != child
assert child.getOperationName().toString().startsWith(parent.getOperationName().toString())
}

cleanup:
executor.shutdownNow()

where:
executor | action | function
new ForkJoinPool() | "submission" | submitRunnable
new ForkJoinPool(10) | "submission" | submitRunnable
Executors.newSingleThreadExecutor() | "submission" | submitRunnable
Executors.newSingleThreadScheduledExecutor() | "submission" | submitRunnable
Executors.newFixedThreadPool(10) | "submission" | submitRunnable
Executors.newScheduledThreadPool(10) | "submission" | submitRunnable
Executors.newCachedThreadPool() | "submission" | submitRunnable
new DefaultEventLoopGroup(10) | "submission" | submitRunnable
new DefaultEventLoopGroup(1).next() | "submission" | submitRunnable
// TODO - flaky - seems to be relying on PendingTrace flush
// new UnorderedThreadPoolEventExecutor(10) | "submission" | submitRunnable
new NioEventLoopGroup(10) | "submission" | submitRunnable
new DefaultEventExecutor() | "submission" | submitRunnable
new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue()) | "submission" | submitRunnable
new ForkJoinPool() | "execution" | executeRunnable
new ForkJoinPool(10) | "execution" | executeRunnable
Executors.newSingleThreadExecutor() | "execution" | executeRunnable
Executors.newSingleThreadScheduledExecutor() | "execution" | executeRunnable
Executors.newFixedThreadPool(10) | "execution" | executeRunnable
Executors.newScheduledThreadPool(10) | "execution" | executeRunnable
Executors.newCachedThreadPool() | "execution" | executeRunnable
new DefaultEventLoopGroup(10) | "execution" | executeRunnable
new DefaultEventLoopGroup(1).next() | "execution" | executeRunnable
// TODO - flaky - seems to be relying on PendingTrace flush
// new UnorderedThreadPoolEventExecutor(10) | "execution" | executeRunnable
new NioEventLoopGroup(10) | "execution" | executeRunnable
new DefaultEventExecutor() | "execution" | executeRunnable
new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue()) | "execution" | executeRunnable
}

def "netty event loop internal executions in #executor are traced with correct lineage" () {
setup:
ExecutorService pool = executor
when:

int taskCount = 200
for (int i = 0; i < taskCount; ++i) {
pool.execute({
String threadName = Thread.currentThread().getName()
runUnderTrace(threadName) {
pool.execute(new Descendant(threadName))
}
})
}

TEST_WRITER.waitForTraces(taskCount)
then:
for (List<DDSpan> trace : TEST_WRITER) {
assert trace.size() == 2
DDSpan parent = trace.find({ it.isRootSpan() })
assert null != parent
DDSpan child = trace.find({ it.getParentId() == parent.getSpanId() })
assert null != child
assert child.getOperationName().toString().startsWith(parent.getOperationName().toString())
}

where:
executor << [
new ThreadPerChannelEventLoop(new OioEventLoopGroup()),
new DefaultEventExecutor(),
new NioEventLoopGroup(10),
new DefaultEventLoopGroup(10)
// flaky
// new UnorderedThreadPoolEventExecutor(10)
]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
"invokeAny with timeout" | invokeAnyTimeout | new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue())

// guava
"execute Runnable" | executeRunnable | MoreExecutors.directExecutor()

"execute Runnable" | executeRunnable | MoreExecutors.newDirectExecutorService()
"submit Runnable" | submitRunnable | MoreExecutors.newDirectExecutorService()
"submit Callable" | submitCallable | MoreExecutors.newDirectExecutorService()
"invokeAll" | invokeAll | MoreExecutors.newDirectExecutorService()
"invokeAll with timeout" | invokeAllTimeout | MoreExecutors.newDirectExecutorService()
"invokeAny" | invokeAny | MoreExecutors.newDirectExecutorService()
"invokeAny with timeout" | invokeAnyTimeout | MoreExecutors.newDirectExecutorService()

"execute Runnable" | executeRunnable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
"submit Runnable" | submitRunnable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
"submit Callable" | submitCallable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
Expand Down Expand Up @@ -250,12 +260,6 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
activeScope().setAsyncPropagation(true)
try {
for (int i = 0; i < 20; ++i) {
// Our current instrumentation instrumentation does not behave very well
// if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
// child traces sometimes since state can contain only one continuation - and
// we do not really have a good way for attributing work to correct parent span
// if we reuse Callable/Runnable.
// Solution for now is to never reuse a Callable/Runnable.
final JavaAsyncChild child = new JavaAsyncChild(false, true)
children.add(child)
try {
Expand Down
Loading