Skip to content

Commit 07d1350

Browse files
Add span propagation for Pekko scheduled tasks (#8765)
1 parent 19cd36d commit 07d1350

File tree

5 files changed

+150
-6
lines changed

5 files changed

+150
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package datadog.trace.instrumentation.pekko.concurrent;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
5+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
6+
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
7+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
8+
9+
import com.google.auto.service.AutoService;
10+
import datadog.trace.agent.tooling.Instrumenter;
11+
import datadog.trace.agent.tooling.InstrumenterModule;
12+
import datadog.trace.api.InstrumenterConfig;
13+
import datadog.trace.bootstrap.InstrumentationContext;
14+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
15+
import java.util.Collections;
16+
import java.util.Map;
17+
import net.bytebuddy.asm.Advice;
18+
19+
/** Active span capturing and continuation for Pekko's async scheduled tasks. */
20+
@AutoService(InstrumenterModule.class)
21+
public class PekkoSchedulerInstrumentation extends InstrumenterModule.Tracing
22+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
23+
24+
public PekkoSchedulerInstrumentation() {
25+
super("java_concurrent", "pekko_concurrent", "pekko_scheduler");
26+
}
27+
28+
@Override
29+
protected boolean defaultEnabled() {
30+
return InstrumenterConfig.get().isPekkoSchedulerEnabled();
31+
}
32+
33+
@Override
34+
public Map<String, String> contextStore() {
35+
return Collections.singletonMap(Runnable.class.getName(), State.class.getName());
36+
}
37+
38+
@Override
39+
public String instrumentedType() {
40+
return "org.apache.pekko.actor.LightArrayRevolverScheduler";
41+
}
42+
43+
@Override
44+
public void methodAdvice(MethodTransformer transformer) {
45+
transformer.applyAdvice(
46+
isMethod()
47+
.and(nameEndsWith("schedule"))
48+
.and(takesArgument(0, named("scala.concurrent.ExecutionContext")))
49+
.and(takesArgument(1, Runnable.class))
50+
.and(takesArgument(2, named("scala.concurrent.duration.FiniteDuration"))),
51+
getClass().getName() + "$Schedule");
52+
}
53+
54+
public static final class Schedule {
55+
@Advice.OnMethodEnter
56+
public static void schedule(@Advice.Argument(1) Runnable runnable) {
57+
capture(InstrumentationContext.get(Runnable.class, State.class), runnable);
58+
}
59+
}
60+
}

dd-java-agent/instrumentation/pekko-concurrent/src/test/groovy/PekkoActorTest.groovy

+39
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
11
import datadog.trace.agent.test.AgentTestRunner
2+
import datadog.trace.api.config.TraceInstrumentationConfig
23
import datadog.trace.bootstrap.instrumentation.api.Tags
34
import spock.lang.Shared
45

56
class PekkoActorTest extends AgentTestRunner {
7+
68
@Shared
79
def pekkoTester = new PekkoActors()
810

11+
@Override
12+
void configurePreAgent() {
13+
super.configurePreAgent()
14+
15+
injectSysConfig(TraceInstrumentationConfig.TRACE_PEKKO_SCHEDULER_ENABLED, "true")
16+
}
17+
918
def "pekko actor send #name #iterations"() {
1019
setup:
1120
def barrier = pekkoTester.block(name)
@@ -54,6 +63,36 @@ class PekkoActorTest extends AgentTestRunner {
5463
"route" | "Rekko" | "How you doin'" | 10
5564
}
5665

66+
def "pekko actor scheduling"() {
67+
when:
68+
def scheduleBarrier = pekkoTester.schedule()
69+
scheduleBarrier.acquire()
70+
71+
then:
72+
assertTraces(1) {
73+
trace(2) {
74+
sortSpansByStart()
75+
span {
76+
resourceName "PekkoActors.schedule"
77+
operationName "schedulerSpan"
78+
tags {
79+
"$Tags.COMPONENT" "trace"
80+
defaultTags()
81+
}
82+
}
83+
span {
84+
resourceName "Scheduler.tracedChild"
85+
operationName "scheduledOperationSpan"
86+
childOf(span(0))
87+
tags {
88+
"$Tags.COMPONENT" "trace"
89+
defaultTags()
90+
}
91+
}
92+
}
93+
}
94+
}
95+
5796
def "actor message handling should close leaked scopes"() {
5897
when:
5998
pekkoTester.leak("Leaker", "drip")

dd-java-agent/instrumentation/pekko-concurrent/src/test/scala/PekkoActors.scala

+40-6
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1+
import Scheduler.Schedule
2+
13
import java.util.concurrent.Semaphore
24
import org.apache.pekko.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
35
import org.apache.pekko.pattern.ask
46
import org.apache.pekko.routing.RoundRobinPool
57
import org.apache.pekko.util.Timeout
68
import datadog.trace.api.Trace
7-
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.{
8-
activateSpan,
9-
setAsyncPropagationEnabled,
10-
activeSpan,
11-
startSpan
12-
}
9+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.{activateSpan, activeSpan, setAsyncPropagationEnabled, startSpan}
1310

1411
import scala.concurrent.duration._
1512

@@ -21,6 +18,9 @@ class PekkoActors extends AutoCloseable {
2118
system.actorOf(Forwarder.props(receiver), "forwarder")
2219
val router: ActorRef =
2320
system.actorOf(RoundRobinPool(5).props(Props[Receiver]()), "router")
21+
val scheduler: ActorRef =
22+
system.actorOf(Scheduler.props, "scheduler")
23+
2424
val tellGreeter: ActorRef =
2525
system.actorOf(Greeter.props("Howdy", receiver), "tell-greeter")
2626
val askGreeter: ActorRef =
@@ -72,6 +72,15 @@ class PekkoActors extends AutoCloseable {
7272
tellGreeter ! WhoToGreet(who)
7373
tellGreeter ! Leak(leak)
7474
}
75+
76+
@Trace
77+
def schedule(): Semaphore = {
78+
val barrier = new Semaphore(0)
79+
setAsyncPropagationEnabled(true)
80+
activeSpan().setSpanName("schedulerSpan")
81+
scheduler ! Schedule(barrier)
82+
barrier
83+
}
7584
}
7685

7786
object PekkoActors {
@@ -162,3 +171,28 @@ class Forwarder(receiverActor: ActorRef) extends Actor with ActorLogging {
162171
}
163172
}
164173
}
174+
175+
object Scheduler {
176+
def props: Props = Props[Scheduler]
177+
178+
final case class Schedule(barrier: Semaphore)
179+
final case class Execute(barrier: Semaphore)
180+
}
181+
182+
class Scheduler extends Actor with ActorLogging {
183+
import Scheduler._
184+
import context.dispatcher
185+
186+
def receive = {
187+
case Schedule(barrier) =>
188+
context.system.scheduler.scheduleOnce(1.second, self, Execute(barrier))
189+
case Execute(barrier) =>
190+
tracedChild(barrier)
191+
}
192+
193+
@Trace
194+
def tracedChild(barrier: Semaphore): Unit = {
195+
activeSpan().setSpanName("scheduledOperationSpan")
196+
barrier.release(1)
197+
}
198+
}

dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ public final class TraceInstrumentationConfig {
167167
"trace.websocket.messages.separate.traces";
168168
public static final String TRACE_WEBSOCKET_TAG_SESSION_ID = "trace.websocket.tag.session-id";
169169

170+
public static final String TRACE_PEKKO_SCHEDULER_ENABLED = "trace.pekko.scheduler.enabled";
171+
170172
public static final String JAX_RS_EXCEPTION_AS_ERROR_ENABLED =
171173
"trace.jax-rs.exception-as-error.enabled";
172174
public static final String JAX_RS_ADDITIONAL_ANNOTATIONS = "trace.jax-rs.additional.annotations";

internal-api/src/main/java/datadog/trace/api/InstrumenterConfig.java

+9
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_EXTENSIONS_PATH;
6565
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_METHODS;
6666
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_OTEL_ENABLED;
67+
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_PEKKO_SCHEDULER_ENABLED;
6768
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_THREAD_POOL_EXECUTORS_EXCLUDE;
6869
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_WEBSOCKET_MESSAGES_ENABLED;
6970
import static datadog.trace.api.config.UsmConfig.USM_ENABLED;
@@ -126,6 +127,7 @@ public class InstrumenterConfig {
126127
private final String httpURLConnectionClassName;
127128
private final String axisTransportClassName;
128129
private final boolean websocketTracingEnabled;
130+
private final boolean pekkoSchedulerEnabled;
129131

130132
private final boolean directAllocationProfilingEnabled;
131133

@@ -278,6 +280,7 @@ private InstrumenterConfig() {
278280
this.websocketTracingEnabled =
279281
configProvider.getBoolean(
280282
TRACE_WEBSOCKET_MESSAGES_ENABLED, DEFAULT_WEBSOCKET_MESSAGES_ENABLED);
283+
this.pekkoSchedulerEnabled = configProvider.getBoolean(TRACE_PEKKO_SCHEDULER_ENABLED, false);
281284
}
282285

283286
public boolean isCodeOriginEnabled() {
@@ -505,6 +508,10 @@ public boolean isWebsocketTracingEnabled() {
505508
return websocketTracingEnabled;
506509
}
507510

511+
public boolean isPekkoSchedulerEnabled() {
512+
return pekkoSchedulerEnabled;
513+
}
514+
508515
/**
509516
* Check whether asynchronous result types are supported with @Trace annotation.
510517
*
@@ -637,6 +644,8 @@ public String toString() {
637644
+ additionalJaxRsAnnotations
638645
+ ", websocketTracingEnabled="
639646
+ websocketTracingEnabled
647+
+ ", pekkoSchedulerEnabled="
648+
+ pekkoSchedulerEnabled
640649
+ '}';
641650
}
642651
}

0 commit comments

Comments
 (0)