Skip to content

Commit 79542fb

Browse files
instrument netty rejected execution handlers
1 parent 8a3869a commit 79542fb

File tree

3 files changed

+65
-19
lines changed

3 files changed

+65
-19
lines changed

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/RejectedExecutionHandlerInstrumentation.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package datadog.trace.instrumentation.java.concurrent;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.DDElementMatchers.hasInterface;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
45
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.cancelTask;
56
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
7+
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
68
import static net.bytebuddy.matcher.ElementMatchers.named;
79
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
810

@@ -34,7 +36,8 @@ public ElementMatcher<? super TypeDescription> typeMatcher() {
3436
"java.util.concurrent.ThreadPoolExecutor$DiscardPolicy",
3537
"java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy",
3638
"java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy")
37-
.or(hasInterface(named("java.util.concurrent.RejectedExecutionHandler")));
39+
.or(hasInterface(named("java.util.concurrent.RejectedExecutionHandler")))
40+
.or(hasInterface(nameEndsWith("netty.util.concurrent.RejectedExecutionHandler")));
3841
}
3942

4043
@Override
@@ -50,9 +53,10 @@ public Map<String, String> contextStore() {
5053
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
5154
return Collections.singletonMap(
5255
isMethod()
53-
.and(named("rejectedExecution"))
54-
.and(takesArgument(0, named("java.lang.Runnable")))
55-
.and(takesArgument(1, named("java.util.concurrent.ThreadPoolExecutor"))),
56+
// JDK or netty
57+
.and(namedOneOf("rejectedExecution", "rejected"))
58+
// must not constrain or use second parameter
59+
.and(takesArgument(0, named("java.lang.Runnable"))),
5660
getClass().getName() + "$Reject");
5761
}
5862

dd-java-agent/instrumentation/java-concurrent/src/test/groovy/RejectedExecutionTest.groovy

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import datadog.trace.agent.test.AgentTestRunner
22
import datadog.trace.api.DDId
33
import datadog.trace.core.DDSpan
4+
import io.netty.util.concurrent.DefaultEventExecutor
5+
import io.netty.util.concurrent.DefaultThreadFactory
46

57
import java.util.concurrent.ArrayBlockingQueue
68
import java.util.concurrent.CountDownLatch
@@ -134,20 +136,7 @@ class RejectedExecutionTest extends AgentTestRunner {
134136
testClosure()
135137

136138
then:
137-
TEST_WRITER.waitForTraces(1)
138-
TEST_WRITER.size() == 1
139-
List<DDSpan> trace = TEST_WRITER.get(0)
140-
trace.size() == 2
141-
DDSpan parent = trace.find {
142-
it.getOperationName() == "parent"
143-
}
144-
DDSpan child = trace.find {
145-
it.getOperationName() == "asyncChild"
146-
}
147-
parent != null
148-
child != null
149-
parent.getParentId() == DDId.ZERO
150-
parent.getSpanId() == child.getParentId()
139+
expectParentChildTrace()
151140

152141

153142
where:
@@ -164,6 +153,54 @@ class RejectedExecutionTest extends AgentTestRunner {
164153
]
165154
}
166155

156+
def "trace reported with swallowing netty rejected execution handler" () {
157+
setup:
158+
DefaultEventExecutor executor = new DefaultEventExecutor(null, new DefaultThreadFactory(DefaultEventExecutor),
159+
1, handler)
160+
CountDownLatch latch = new CountDownLatch(1)
161+
// this task will block the executor thread ensuring the next task gets enqueued
162+
executor.submit({
163+
latch.await()
164+
})
165+
// this will sit in the queue
166+
executor.submit({})
167+
168+
when:
169+
runUnderTrace("parent") {
170+
activeScope().setAsyncPropagation(true)
171+
// must be rejected because the queue will be full until some
172+
// time after the first task is released
173+
executor.submit((Runnable) new JavaAsyncChild(true, false))
174+
latch.countDown()
175+
}
176+
177+
then:
178+
expectParentChildTrace()
179+
180+
where:
181+
handler << [
182+
new SwallowingRejectedExecutionHandler()
183+
]
184+
}
185+
186+
def expectParentChildTrace() {
187+
TEST_WRITER.waitForTraces(1)
188+
TEST_WRITER.size() == 1
189+
List<DDSpan> trace = TEST_WRITER.get(0)
190+
assert trace.size() == 2
191+
DDSpan parent = trace.find {
192+
it.getOperationName() == "parent"
193+
}
194+
DDSpan child = trace.find {
195+
it.getOperationName() == "asyncChild"
196+
}
197+
assert parent != null
198+
assert child != null
199+
assert parent.getParentId() == DDId.ZERO
200+
assert parent.getSpanId() == child.getParentId()
201+
return true
202+
}
203+
167204
def setupShutdownExecutor(RejectedExecutionHandler rejectedExecutionHandler) {
168205
ExecutorService pool = new ThreadPoolExecutor(1,
169206
1,
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import io.netty.util.concurrent.SingleThreadEventExecutor;
12
import java.util.concurrent.RejectedExecutionHandler;
23
import java.util.concurrent.ThreadPoolExecutor;
34

4-
public class SwallowingRejectedExecutionHandler implements RejectedExecutionHandler {
5+
public class SwallowingRejectedExecutionHandler
6+
implements RejectedExecutionHandler, io.netty.util.concurrent.RejectedExecutionHandler {
57
@Override
68
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {}
9+
10+
@Override
11+
public void rejected(Runnable task, SingleThreadEventExecutor executor) {}
712
}

0 commit comments

Comments
 (0)