Skip to content

Commit af5f441

Browse files
add queue type and length to queue events (#8242)
1 parent c58164b commit af5f441

File tree

16 files changed

+284
-36
lines changed

16 files changed

+284
-36
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/QueueTimerHelper.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,31 @@ private static final class RateLimiterHolder {
3030
}
3131

3232
public static <T> void startQueuingTimer(
33-
ContextStore<T, State> taskContextStore, Class<?> schedulerClass, T task) {
33+
ContextStore<T, State> taskContextStore,
34+
Class<?> schedulerClass,
35+
Class<?> queueClass,
36+
int queueLength,
37+
T task) {
3438
State state = taskContextStore.get(task);
35-
startQueuingTimer(state, schedulerClass, task);
39+
startQueuingTimer(state, schedulerClass, queueClass, queueLength, task);
3640
}
3741

38-
public static void startQueuingTimer(State state, Class<?> schedulerClass, Object task) {
42+
public static void startQueuingTimer(
43+
State state, Class<?> schedulerClass, Class<?> queueClass, int queueLength, Object task) {
3944
if (Platform.isNativeImage()) {
4045
// explicitly not supported for Graal native image
4146
return;
4247
}
48+
// TODO consider queue length based sampling here to reduce overhead
4349
// avoid calling this before JFR is initialised because it will lead to reading the wrong
4450
// TSC frequency before JFR has set it up properly
4551
if (task != null && state != null && InstrumentationBasedProfiling.isJFRReady()) {
4652
QueueTiming timing =
4753
(QueueTiming) AgentTracer.get().getProfilingContext().start(Timer.TimerType.QUEUEING);
4854
timing.setTask(task);
4955
timing.setScheduler(schedulerClass);
56+
timing.setQueue(queueClass);
57+
timing.setQueueLength(queueLength);
5058
state.setTiming(timing);
5159
}
5260
}

dd-java-agent/agent-profiling/profiling-controller-openjdk/src/main/java/com/datadog/profiling/controller/openjdk/events/QueueTimeEvent.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ public class QueueTimeEvent extends Event implements QueueTiming {
3333
@Label("Scheduler")
3434
private Class<?> scheduler;
3535

36+
@Label("Queue")
37+
private Class<?> queueType;
38+
39+
@Label("Queue Length on Entry")
40+
private int queueLength;
41+
3642
public QueueTimeEvent() {
3743
this.origin = Thread.currentThread();
3844
AgentSpan activeSpan = AgentTracer.activeSpan();
@@ -55,6 +61,16 @@ public void setScheduler(Class<?> scheduler) {
5561
this.scheduler = scheduler;
5662
}
5763

64+
@Override
65+
public void setQueue(Class<?> queueType) {
66+
this.queueType = queueType;
67+
}
68+
69+
@Override
70+
public void setQueueLength(int queueLength) {
71+
this.queueLength = queueLength;
72+
}
73+
5874
@Override
5975
public void report() {
6076
commit();

dd-java-agent/agent-profiling/profiling-ddprof/src/main/java/com/datadog/profiling/ddprof/DatadogProfiler.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,14 +451,21 @@ boolean shouldRecordQueueTimeEvent(long startMillis) {
451451
return System.currentTimeMillis() - startMillis >= queueTimeThresholdMillis;
452452
}
453453

454-
void recordQueueTimeEvent(long startTicks, Object task, Class<?> scheduler, Thread origin) {
454+
void recordQueueTimeEvent(
455+
long startTicks,
456+
Object task,
457+
Class<?> scheduler,
458+
Class<?> queueType,
459+
int queueLength,
460+
Thread origin) {
455461
if (profiler != null) {
456462
// note: because this type traversal can update secondary_super_cache (see JDK-8180450)
457463
// we avoid doing this unless we are absolutely certain we will record the event
458464
Class<?> taskType = TaskWrapper.getUnwrappedType(task);
459465
if (taskType != null) {
460466
long endTicks = profiler.getCurrentTicks();
461-
profiler.recordQueueTime(startTicks, endTicks, taskType, scheduler, origin);
467+
profiler.recordQueueTime(
468+
startTicks, endTicks, taskType, scheduler, queueType, queueLength, origin);
462469
}
463470
}
464471
}

dd-java-agent/agent-profiling/profiling-ddprof/src/main/java/com/datadog/profiling/ddprof/QueueTimeTracker.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public class QueueTimeTracker implements QueueTiming {
1313
// FIXME this can be eliminated by altering the instrumentation
1414
// since it is known when the item is polled from the queue
1515
private Class<?> scheduler;
16+
private Class<?> queue;
17+
private int queueLength;
1618

1719
public QueueTimeTracker(DatadogProfiler profiler, long startTicks) {
1820
this.profiler = profiler;
@@ -31,13 +33,23 @@ public void setScheduler(Class<?> scheduler) {
3133
this.scheduler = scheduler;
3234
}
3335

36+
@Override
37+
public void setQueue(Class<?> queue) {
38+
this.queue = queue;
39+
}
40+
41+
@Override
42+
public void setQueueLength(int queueLength) {
43+
this.queueLength = queueLength;
44+
}
45+
3446
@Override
3547
public void report() {
3648
assert weakTask != null && scheduler != null;
3749
Object task = this.weakTask.get();
3850
if (task != null) {
3951
// indirection reduces shallow size of the tracker instance
40-
profiler.recordQueueTimeEvent(startTicks, task, scheduler, origin);
52+
profiler.recordQueueTimeEvent(startTicks, task, scheduler, queue, queueLength, origin);
4153
}
4254
}
4355

dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/QueuedCommandInstrumentation.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.channels.Channel;
2121
import java.util.Collections;
2222
import java.util.Map;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
2324
import net.bytebuddy.asm.Advice;
2425

2526
@AutoService(InstrumenterModule.class)
@@ -66,7 +67,14 @@ public static final class Construct {
6667
public static void after(@Advice.This Object command) {
6768
ContextStore<Object, State> contextStore = InstrumentationContext.get(QUEUED_COMMAND, STATE);
6869
capture(contextStore, command);
69-
QueueTimerHelper.startQueuingTimer(contextStore, Channel.class, command);
70+
// FIXME hard to handle both the lifecyle and get access to the queue instance in the same
71+
// frame within the WriteQueue class.
72+
// This means we can't get the queue length. A (bad) alternative would be to instrument
73+
// ConcurrentLinkedQueue broadly,
74+
// or we could write more brittle instrumentation targeting code patterns in different gRPC
75+
// versions.
76+
QueueTimerHelper.startQueuingTimer(
77+
contextStore, Channel.class, ConcurrentLinkedQueue.class, 0, command);
7078
}
7179
}
7280

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/executor/ThreadPoolExecutorInstrumentation.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Collections;
3232
import java.util.HashMap;
3333
import java.util.Map;
34+
import java.util.Queue;
3435
import java.util.concurrent.RunnableFuture;
3536
import java.util.concurrent.ThreadPoolExecutor;
3637
import net.bytebuddy.asm.Advice;
@@ -162,12 +163,20 @@ public static void capture(
162163
// excluded as
163164
// Runnables but it is not until now that they will be put on the executor's queue
164165
if (!exclude(RUNNABLE, task)) {
166+
Queue<?> queue = tpe.getQueue();
165167
QueueTimerHelper.startQueuingTimer(
166-
InstrumentationContext.get(Runnable.class, State.class), tpe.getClass(), task);
168+
InstrumentationContext.get(Runnable.class, State.class),
169+
tpe.getClass(),
170+
queue.getClass(),
171+
queue.size(),
172+
task);
167173
} else if (!exclude(RUNNABLE_FUTURE, task) && task instanceof RunnableFuture) {
174+
Queue<?> queue = tpe.getQueue();
168175
QueueTimerHelper.startQueuingTimer(
169176
InstrumentationContext.get(RunnableFuture.class, State.class),
170177
tpe.getClass(),
178+
queue.getClass(),
179+
queue.size(),
171180
(RunnableFuture<?>) task);
172181
}
173182
}

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/forkjoin/JavaForkJoinPoolInstrumentation.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
import datadog.trace.agent.tooling.InstrumenterModule;
1717
import datadog.trace.bootstrap.ContextStore;
1818
import datadog.trace.bootstrap.InstrumentationContext;
19-
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
2019
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
2120
import java.util.Map;
22-
import java.util.concurrent.ForkJoinPool;
2321
import java.util.concurrent.ForkJoinTask;
2422
import net.bytebuddy.asm.Advice;
2523

@@ -53,13 +51,11 @@ public void methodAdvice(MethodTransformer transformer) {
5351
public static final class ExternalPush {
5452
@SuppressWarnings("rawtypes")
5553
@Advice.OnMethodEnter
56-
public static <T> void externalPush(
57-
@Advice.This ForkJoinPool pool, @Advice.Argument(0) ForkJoinTask<T> task) {
54+
public static <T> void externalPush(@Advice.Argument(0) ForkJoinTask<T> task) {
5855
if (!exclude(FORK_JOIN_TASK, task)) {
5956
ContextStore<ForkJoinTask, State> contextStore =
6057
InstrumentationContext.get(ForkJoinTask.class, State.class);
6158
capture(contextStore, task);
62-
QueueTimerHelper.startQueuingTimer(contextStore, pool.getClass(), task);
6359
}
6460
}
6561

@@ -74,13 +70,11 @@ public static <T> void cleanup(
7470

7571
public static final class PoolSubmit {
7672
@Advice.OnMethodEnter
77-
public static <T> void poolSubmit(
78-
@Advice.This ForkJoinPool pool, @Advice.Argument(1) ForkJoinTask<T> task) {
73+
public static <T> void poolSubmit(@Advice.Argument(1) ForkJoinTask<T> task) {
7974
if (!exclude(FORK_JOIN_TASK, task)) {
8075
ContextStore<ForkJoinTask, State> contextStore =
8176
InstrumentationContext.get(ForkJoinTask.class, State.class);
8277
capture(contextStore, task);
83-
QueueTimerHelper.startQueuingTimer(contextStore, pool.getClass(), task);
8478
}
8579
}
8680

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package datadog.trace.instrumentation.java.concurrent.forkjoin;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
5+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.FORK_JOIN_TASK;
6+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
7+
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
8+
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.FORK_JOIN_POOL_INSTRUMENTATION_NAME;
9+
import static java.util.Collections.singletonMap;
10+
import static net.bytebuddy.matcher.ElementMatchers.fieldType;
11+
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
12+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
14+
15+
import com.google.auto.service.AutoService;
16+
import datadog.trace.agent.tooling.Instrumenter;
17+
import datadog.trace.agent.tooling.InstrumenterModule;
18+
import datadog.trace.api.config.ProfilingConfig;
19+
import datadog.trace.bootstrap.ContextStore;
20+
import datadog.trace.bootstrap.InstrumentationContext;
21+
import datadog.trace.bootstrap.config.provider.ConfigProvider;
22+
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
23+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
24+
import java.util.Map;
25+
import java.util.concurrent.ForkJoinPool;
26+
import java.util.concurrent.ForkJoinTask;
27+
import net.bytebuddy.asm.Advice;
28+
29+
@AutoService(InstrumenterModule.class)
30+
public class JavaForkJoinWorkQueueInstrumentation extends InstrumenterModule.Profiling
31+
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
32+
33+
public JavaForkJoinWorkQueueInstrumentation() {
34+
super(
35+
EXECUTOR_INSTRUMENTATION_NAME,
36+
FORK_JOIN_POOL_INSTRUMENTATION_NAME,
37+
FORK_JOIN_POOL_INSTRUMENTATION_NAME + "-workqueue");
38+
}
39+
40+
@Override
41+
public String instrumentedType() {
42+
return "java.util.concurrent.ForkJoinPool$WorkQueue";
43+
}
44+
45+
@Override
46+
public boolean isEnabled() {
47+
return super.isEnabled()
48+
&& ConfigProvider.getInstance()
49+
.getBoolean(
50+
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED,
51+
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED_DEFAULT);
52+
}
53+
54+
@Override
55+
public Map<String, String> contextStore() {
56+
return singletonMap("java.util.concurrent.ForkJoinTask", State.class.getName());
57+
}
58+
59+
@Override
60+
public void methodAdvice(MethodTransformer transformer) {
61+
String name = getClass().getName();
62+
transformer.applyAdvice(
63+
isMethod()
64+
.and(named("push"))
65+
.and(takesArgument(0, named("java.util.concurrent.ForkJoinTask")))
66+
.and(
67+
isDeclaredBy(
68+
declaresField(fieldType(int.class).and(named("top")))
69+
.and(declaresField(fieldType(int.class).and(named("base")))))),
70+
name + "$PushTask");
71+
}
72+
73+
public static final class PushTask {
74+
@SuppressWarnings("rawtypes")
75+
@Advice.OnMethodEnter(suppress = Throwable.class)
76+
public static <T> void push(
77+
@Advice.This Object workQueue,
78+
@Advice.FieldValue("top") int top,
79+
@Advice.FieldValue("base") int base,
80+
@Advice.Argument(0) ForkJoinTask<T> task) {
81+
if (!exclude(FORK_JOIN_TASK, task)) {
82+
ContextStore<ForkJoinTask, State> contextStore =
83+
InstrumentationContext.get(ForkJoinTask.class, State.class);
84+
QueueTimerHelper.startQueuingTimer(
85+
contextStore, ForkJoinPool.class, workQueue.getClass(), top - base, task);
86+
}
87+
}
88+
}
89+
}

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/timer/JavaTimerInstrumentation.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
66
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
77
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
8-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper.startQueuingTimer;
98
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
109
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.RUNNABLE_INSTRUMENTATION_NAME;
1110
import static java.util.Collections.singletonMap;
@@ -21,7 +20,6 @@
2120
import datadog.trace.bootstrap.InstrumentationContext;
2221
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
2322
import java.util.Map;
24-
import java.util.Timer;
2523
import java.util.TimerTask;
2624
import net.bytebuddy.asm.Advice;
2725

@@ -67,7 +65,6 @@ public static void before(@Advice.Argument(0) TimerTask task, @Advice.Argument(2
6765
ContextStore<Runnable, State> contextStore =
6866
InstrumentationContext.get(Runnable.class, State.class);
6967
capture(contextStore, task);
70-
startQueuingTimer(contextStore, Timer.class, task);
7168
}
7269
}
7370

dd-java-agent/instrumentation/java-concurrent/src/test/groovy/QueueTimingForkedTest.groovy

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import datadog.trace.agent.test.AgentTestRunner
22
import datadog.trace.agent.test.TestProfilingContextIntegration
3+
import datadog.trace.api.Platform
34
import datadog.trace.bootstrap.instrumentation.jfr.InstrumentationBasedProfiling
45

56
import java.util.concurrent.Executors
7+
import java.util.concurrent.ForkJoinPool
8+
import java.util.concurrent.LinkedBlockingQueue
69

710
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
811

@@ -20,21 +23,34 @@ class QueueTimingForkedTest extends AgentTestRunner {
2023
def "test queue timing with submit"() {
2124
setup:
2225
def executor = Executors.newSingleThreadExecutor()
26+
def fjp = new ForkJoinPool(1)
2327

2428
when:
2529
runUnderTrace("parent", {
2630
executor.submit(new TestRunnable()).get()
2731
})
2832

2933
then:
30-
verify()
34+
verify(LinkedBlockingQueue.name)
35+
36+
when:
37+
runUnderTrace("parent", {
38+
fjp.submit(new TestRunnable()).get()
39+
})
40+
41+
then:
42+
// flaky before JDK21
43+
if (Platform.isJavaVersionAtLeast(21)) {
44+
verify("java.util.concurrent.ForkJoinPool\$WorkQueue")
45+
}
3146

3247
cleanup:
3348
executor.shutdown()
49+
fjp.shutdown()
3450
TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.clear()
3551
}
3652

37-
void verify() {
53+
void verify(expectedQueueType) {
3854
assert TEST_PROFILING_CONTEXT_INTEGRATION.isBalanced()
3955
assert !TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.isEmpty()
4056
int numAsserts = 0
@@ -45,6 +61,8 @@ class QueueTimingForkedTest extends AgentTestRunner {
4561
assert timing.task == TestRunnable
4662
assert timing.scheduler != null
4763
assert timing.origin == Thread.currentThread()
64+
assert timing.queueLength >= 0
65+
assert timing.queue.name == expectedQueueType
4866
numAsserts++
4967
}
5068
}

0 commit comments

Comments
 (0)