Skip to content

Commit 9a95234

Browse files
Merge pull request #2002 from DataDog/rgs/more-executor-tests
more executor tests
2 parents 19d6565 + 219b5e7 commit 9a95234

10 files changed

+379
-37
lines changed

dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy

+16-7
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import com.google.common.util.concurrent.MoreExecutors
12
import datadog.trace.agent.test.AgentTestRunner
23
import datadog.trace.api.DDSpanTypes
34
import datadog.trace.bootstrap.instrumentation.api.Tags
@@ -11,6 +12,7 @@ import io.grpc.inprocess.InProcessServerBuilder
1112
import io.grpc.stub.StreamObserver
1213

1314
import java.util.concurrent.CopyOnWriteArrayList
15+
import java.util.concurrent.Executors
1416
import java.util.concurrent.TimeUnit
1517
import java.util.concurrent.atomic.AtomicReference
1618

@@ -62,7 +64,9 @@ class GrpcStreamingTest extends AgentTestRunner {
6264
}
6365
}
6466
}
65-
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
67+
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter)
68+
.executor(directExecutor ? MoreExecutors.directExecutor() : Executors.newCachedThreadPool())
69+
.build().start()
6670

6771
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
6872
GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady()
@@ -178,12 +182,17 @@ class GrpcStreamingTest extends AgentTestRunner {
178182
server?.shutdownNow()?.awaitTermination()
179183

180184
where:
181-
name | clientMessageCount | serverMessageCount
182-
"A" | 1 | 1
183-
"B" | 2 | 1
184-
"C" | 1 | 2
185-
"D" | 2 | 2
186-
"E" | 3 | 3
185+
name | clientMessageCount | serverMessageCount | directExecutor
186+
"A" | 1 | 1 | false
187+
"B" | 2 | 1 | false
188+
"C" | 1 | 2 | false
189+
"D" | 2 | 2 | false
190+
"E" | 3 | 3 | false
191+
"A" | 1 | 1 | true
192+
"B" | 2 | 1 | true
193+
"C" | 1 | 2 | true
194+
"D" | 2 | 2 | true
195+
"E" | 3 | 3 | true
187196

188197
clientRange = 1..clientMessageCount
189198
serverRange = 1..serverMessageCount

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/AbstractExecutorInstrumentation.java

-5
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ public AbstractExecutorInstrumentation(final String... additionalNames) {
4545
PERMITTED_EXECUTORS_PREFIXES = Collections.emptyList();
4646
} else {
4747
final String[] whitelist = {
48-
"com.google.common.util.concurrent.AbstractListeningExecutorService",
49-
"com.google.common.util.concurrent.MoreExecutors$ListeningDecorator",
50-
"com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator",
5148
"io.netty.channel.epoll.EpollEventLoop",
5249
"io.netty.channel.epoll.EpollEventLoopGroup",
5350
"io.netty.channel.MultithreadEventLoopGroup",
@@ -64,8 +61,6 @@ public AbstractExecutorInstrumentation(final String... additionalNames) {
6461
"io.netty.util.concurrent.SingleThreadEventExecutor",
6562
"java.util.concurrent.AbstractExecutorService",
6663
"java.util.concurrent.CompletableFuture$ThreadPerTaskExecutor",
67-
"java.util.concurrent.Executors$DelegatedExecutorService",
68-
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService",
6964
"java.util.concurrent.ThreadPoolExecutor",
7065
"kotlinx.coroutines.scheduling.CoroutineScheduler",
7166
"org.eclipse.jetty.util.thread.QueuedThreadPool",

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RejectedExecutionHandlerInstrumentation.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package datadog.trace.instrumentation.java.concurrent;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.DDElementMatchers.hasInterface;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
45
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.cancelTask;
56
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
7+
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
68
import static net.bytebuddy.matcher.ElementMatchers.named;
79
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
810

@@ -34,7 +36,8 @@ public ElementMatcher<? super TypeDescription> typeMatcher() {
3436
"java.util.concurrent.ThreadPoolExecutor$DiscardPolicy",
3537
"java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy",
3638
"java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy")
37-
.or(hasInterface(named("java.util.concurrent.RejectedExecutionHandler")));
39+
.or(hasInterface(named("java.util.concurrent.RejectedExecutionHandler")))
40+
.or(hasInterface(nameEndsWith("netty.util.concurrent.RejectedExecutionHandler")));
3841
}
3942

4043
@Override
@@ -50,9 +53,10 @@ public Map<String, String> contextStore() {
5053
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
5154
return Collections.singletonMap(
5255
isMethod()
53-
.and(named("rejectedExecution"))
54-
.and(takesArgument(0, named("java.lang.Runnable")))
55-
.and(takesArgument(1, named("java.util.concurrent.ThreadPoolExecutor"))),
56+
// JDK or netty
57+
.and(namedOneOf("rejectedExecution", "rejected"))
58+
// must not constrain or use second parameter
59+
.and(takesArgument(0, named("java.lang.Runnable"))),
5660
getClass().getName() + "$Reject");
5761
}
5862

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import datadog.trace.agent.test.AgentTestRunner
2+
import datadog.trace.core.DDSpan
3+
import io.netty.channel.DefaultEventLoopGroup
4+
import io.netty.channel.ThreadPerChannelEventLoop
5+
import io.netty.channel.nio.NioEventLoopGroup
6+
import io.netty.channel.oio.OioEventLoopGroup
7+
import io.netty.util.concurrent.DefaultEventExecutor
8+
import org.apache.tomcat.util.threads.TaskQueue
9+
import spock.lang.Shared
10+
11+
import java.util.concurrent.ExecutorService
12+
import java.util.concurrent.Executors
13+
import java.util.concurrent.ForkJoinPool
14+
import java.util.concurrent.ThreadFactory
15+
import java.util.concurrent.TimeUnit
16+
import java.util.concurrent.atomic.AtomicInteger
17+
18+
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
19+
20+
class CrossedContextTest extends AgentTestRunner {
21+
22+
@Shared
23+
def executeRunnable = { e, c -> e.execute((Runnable) c) }
24+
@Shared
25+
def submitRunnable = { e, c -> e.submit((Runnable) c) }
26+
27+
// this pool is not what's being tested
28+
@Shared
29+
ExecutorService pool = Executors.newFixedThreadPool(40, new ThreadFactory() {
30+
AtomicInteger i = new AtomicInteger()
31+
32+
@Override
33+
Thread newThread(Runnable r) {
34+
Thread t = new Thread(r)
35+
t.setName("parent-" + i.getAndIncrement())
36+
return t
37+
}
38+
})
39+
40+
def cleanupSpec() {
41+
pool.shutdownNow()
42+
}
43+
44+
def "concurrent #action are traced with correct lineage in #executor"() {
45+
when:
46+
def sut = executor
47+
def fn = function
48+
int taskCount = 200
49+
for (int i = 0; i < taskCount; ++i) {
50+
pool.execute({
51+
String threadName = Thread.currentThread().getName()
52+
runUnderTrace(threadName) {
53+
fn(sut, new Descendant(threadName))
54+
}
55+
})
56+
}
57+
58+
TEST_WRITER.waitForTraces(taskCount)
59+
then:
60+
for (List<DDSpan> trace : TEST_WRITER) {
61+
assert trace.size() == 2
62+
DDSpan parent = trace.find({ it.isRootSpan() })
63+
assert null != parent
64+
DDSpan child = trace.find({ it.getParentId() == parent.getSpanId() })
65+
assert null != child
66+
assert child.getOperationName().toString().startsWith(parent.getOperationName().toString())
67+
}
68+
69+
cleanup:
70+
executor.shutdownNow()
71+
72+
where:
73+
executor | action | function
74+
new ForkJoinPool() | "submission" | submitRunnable
75+
new ForkJoinPool(10) | "submission" | submitRunnable
76+
Executors.newSingleThreadExecutor() | "submission" | submitRunnable
77+
Executors.newSingleThreadScheduledExecutor() | "submission" | submitRunnable
78+
Executors.newFixedThreadPool(10) | "submission" | submitRunnable
79+
Executors.newScheduledThreadPool(10) | "submission" | submitRunnable
80+
Executors.newCachedThreadPool() | "submission" | submitRunnable
81+
new DefaultEventLoopGroup(10) | "submission" | submitRunnable
82+
new DefaultEventLoopGroup(1).next() | "submission" | submitRunnable
83+
// TODO - flaky - seems to be relying on PendingTrace flush
84+
// new UnorderedThreadPoolEventExecutor(10) | "submission" | submitRunnable
85+
new NioEventLoopGroup(10) | "submission" | submitRunnable
86+
new DefaultEventExecutor() | "submission" | submitRunnable
87+
new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue()) | "submission" | submitRunnable
88+
new ForkJoinPool() | "execution" | executeRunnable
89+
new ForkJoinPool(10) | "execution" | executeRunnable
90+
Executors.newSingleThreadExecutor() | "execution" | executeRunnable
91+
Executors.newSingleThreadScheduledExecutor() | "execution" | executeRunnable
92+
Executors.newFixedThreadPool(10) | "execution" | executeRunnable
93+
Executors.newScheduledThreadPool(10) | "execution" | executeRunnable
94+
Executors.newCachedThreadPool() | "execution" | executeRunnable
95+
new DefaultEventLoopGroup(10) | "execution" | executeRunnable
96+
new DefaultEventLoopGroup(1).next() | "execution" | executeRunnable
97+
// TODO - flaky - seems to be relying on PendingTrace flush
98+
// new UnorderedThreadPoolEventExecutor(10) | "execution" | executeRunnable
99+
new NioEventLoopGroup(10) | "execution" | executeRunnable
100+
new DefaultEventExecutor() | "execution" | executeRunnable
101+
new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue()) | "execution" | executeRunnable
102+
}
103+
104+
def "netty event loop internal executions in #executor are traced with correct lineage" () {
105+
setup:
106+
ExecutorService pool = executor
107+
when:
108+
109+
int taskCount = 200
110+
for (int i = 0; i < taskCount; ++i) {
111+
pool.execute({
112+
String threadName = Thread.currentThread().getName()
113+
runUnderTrace(threadName) {
114+
pool.execute(new Descendant(threadName))
115+
}
116+
})
117+
}
118+
119+
TEST_WRITER.waitForTraces(taskCount)
120+
then:
121+
for (List<DDSpan> trace : TEST_WRITER) {
122+
assert trace.size() == 2
123+
DDSpan parent = trace.find({ it.isRootSpan() })
124+
assert null != parent
125+
DDSpan child = trace.find({ it.getParentId() == parent.getSpanId() })
126+
assert null != child
127+
assert child.getOperationName().toString().startsWith(parent.getOperationName().toString())
128+
}
129+
130+
where:
131+
executor << [
132+
new ThreadPerChannelEventLoop(new OioEventLoopGroup()),
133+
new DefaultEventExecutor(),
134+
new NioEventLoopGroup(10),
135+
new DefaultEventLoopGroup(10)
136+
// flaky
137+
// new UnorderedThreadPoolEventExecutor(10)
138+
]
139+
}
140+
141+
}

dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy

+10-6
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,16 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
179179
"invokeAny with timeout" | invokeAnyTimeout | new org.apache.tomcat.util.threads.ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new TaskQueue())
180180

181181
// guava
182+
"execute Runnable" | executeRunnable | MoreExecutors.directExecutor()
183+
184+
"execute Runnable" | executeRunnable | MoreExecutors.newDirectExecutorService()
185+
"submit Runnable" | submitRunnable | MoreExecutors.newDirectExecutorService()
186+
"submit Callable" | submitCallable | MoreExecutors.newDirectExecutorService()
187+
"invokeAll" | invokeAll | MoreExecutors.newDirectExecutorService()
188+
"invokeAll with timeout" | invokeAllTimeout | MoreExecutors.newDirectExecutorService()
189+
"invokeAny" | invokeAny | MoreExecutors.newDirectExecutorService()
190+
"invokeAny with timeout" | invokeAnyTimeout | MoreExecutors.newDirectExecutorService()
191+
182192
"execute Runnable" | executeRunnable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
183193
"submit Runnable" | submitRunnable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
184194
"submit Callable" | submitCallable | MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
@@ -250,12 +260,6 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
250260
activeScope().setAsyncPropagation(true)
251261
try {
252262
for (int i = 0; i < 20; ++i) {
253-
// Our current instrumentation instrumentation does not behave very well
254-
// if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned'
255-
// child traces sometimes since state can contain only one continuation - and
256-
// we do not really have a good way for attributing work to correct parent span
257-
// if we reuse Callable/Runnable.
258-
// Solution for now is to never reuse a Callable/Runnable.
259263
final JavaAsyncChild child = new JavaAsyncChild(false, true)
260264
children.add(child)
261265
try {

0 commit comments

Comments
 (0)