diff --git a/dd-java-agent/instrumentation/pekko-concurrent/src/main/java/datadog/trace/instrumentation/pekko/concurrent/PekkoSchedulerInstrumentation.java b/dd-java-agent/instrumentation/pekko-concurrent/src/main/java/datadog/trace/instrumentation/pekko/concurrent/PekkoSchedulerInstrumentation.java new file mode 100644 index 00000000000..4522144964b --- /dev/null +++ b/dd-java-agent/instrumentation/pekko-concurrent/src/main/java/datadog/trace/instrumentation/pekko/concurrent/PekkoSchedulerInstrumentation.java @@ -0,0 +1,60 @@ +package datadog.trace.instrumentation.pekko.concurrent; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.InstrumenterConfig; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Collections; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +/** Active span capturing and continuation for Pekko's async scheduled tasks. */ +@AutoService(InstrumenterModule.class) +public class PekkoSchedulerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public PekkoSchedulerInstrumentation() { + super("java_concurrent", "pekko_concurrent", "pekko_scheduler"); + } + + @Override + protected boolean defaultEnabled() { + return InstrumenterConfig.get().isPekkoSchedulerEnabled(); + } + + @Override + public Map contextStore() { + return Collections.singletonMap(Runnable.class.getName(), State.class.getName()); + } + + @Override + public String instrumentedType() { + return "org.apache.pekko.actor.LightArrayRevolverScheduler"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(nameEndsWith("schedule")) + .and(takesArgument(0, named("scala.concurrent.ExecutionContext"))) + .and(takesArgument(1, Runnable.class)) + .and(takesArgument(2, named("scala.concurrent.duration.FiniteDuration"))), + getClass().getName() + "$Schedule"); + } + + public static final class Schedule { + @Advice.OnMethodEnter + public static void schedule(@Advice.Argument(1) Runnable runnable) { + capture(InstrumentationContext.get(Runnable.class, State.class), runnable); + } + } +} diff --git a/dd-java-agent/instrumentation/pekko-concurrent/src/test/groovy/PekkoActorTest.groovy b/dd-java-agent/instrumentation/pekko-concurrent/src/test/groovy/PekkoActorTest.groovy index 212a1e7e18c..afc2ae5ff59 100644 --- a/dd-java-agent/instrumentation/pekko-concurrent/src/test/groovy/PekkoActorTest.groovy +++ b/dd-java-agent/instrumentation/pekko-concurrent/src/test/groovy/PekkoActorTest.groovy @@ -1,11 +1,20 @@ import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.config.TraceInstrumentationConfig import datadog.trace.bootstrap.instrumentation.api.Tags import spock.lang.Shared class PekkoActorTest extends AgentTestRunner { + @Shared def pekkoTester = new PekkoActors() + @Override + void configurePreAgent() { + super.configurePreAgent() + + injectSysConfig(TraceInstrumentationConfig.TRACE_PEKKO_SCHEDULER_ENABLED, "true") + } + def "pekko actor send #name #iterations"() { setup: def barrier = pekkoTester.block(name) @@ -54,6 +63,36 @@ class PekkoActorTest extends AgentTestRunner { "route" | "Rekko" | "How you doin'" | 10 } + def "pekko actor scheduling"() { + when: + def scheduleBarrier = pekkoTester.schedule() + scheduleBarrier.acquire() + + then: + assertTraces(1) { + trace(2) { + sortSpansByStart() + span { + resourceName "PekkoActors.schedule" + operationName "schedulerSpan" + tags { + "$Tags.COMPONENT" "trace" + defaultTags() + } + } + span { + resourceName "Scheduler.tracedChild" + operationName "scheduledOperationSpan" + childOf(span(0)) + tags { + "$Tags.COMPONENT" "trace" + defaultTags() + } + } + } + } + } + def "actor message handling should close leaked scopes"() { when: pekkoTester.leak("Leaker", "drip") diff --git a/dd-java-agent/instrumentation/pekko-concurrent/src/test/scala/PekkoActors.scala b/dd-java-agent/instrumentation/pekko-concurrent/src/test/scala/PekkoActors.scala index ac6d226d709..f86729a3813 100644 --- a/dd-java-agent/instrumentation/pekko-concurrent/src/test/scala/PekkoActors.scala +++ b/dd-java-agent/instrumentation/pekko-concurrent/src/test/scala/PekkoActors.scala @@ -1,15 +1,12 @@ +import Scheduler.Schedule + import java.util.concurrent.Semaphore import org.apache.pekko.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props} import org.apache.pekko.pattern.ask import org.apache.pekko.routing.RoundRobinPool import org.apache.pekko.util.Timeout import datadog.trace.api.Trace -import datadog.trace.bootstrap.instrumentation.api.AgentTracer.{ - activateSpan, - setAsyncPropagationEnabled, - activeSpan, - startSpan -} +import datadog.trace.bootstrap.instrumentation.api.AgentTracer.{activateSpan, activeSpan, setAsyncPropagationEnabled, startSpan} import scala.concurrent.duration._ @@ -21,6 +18,9 @@ class PekkoActors extends AutoCloseable { system.actorOf(Forwarder.props(receiver), "forwarder") val router: ActorRef = system.actorOf(RoundRobinPool(5).props(Props[Receiver]()), "router") + val scheduler: ActorRef = + system.actorOf(Scheduler.props, "scheduler") + val tellGreeter: ActorRef = system.actorOf(Greeter.props("Howdy", receiver), "tell-greeter") val askGreeter: ActorRef = @@ -72,6 +72,15 @@ class PekkoActors extends AutoCloseable { tellGreeter ! WhoToGreet(who) tellGreeter ! Leak(leak) } + + @Trace + def schedule(): Semaphore = { + val barrier = new Semaphore(0) + setAsyncPropagationEnabled(true) + activeSpan().setSpanName("schedulerSpan") + scheduler ! Schedule(barrier) + barrier + } } object PekkoActors { @@ -162,3 +171,28 @@ class Forwarder(receiverActor: ActorRef) extends Actor with ActorLogging { } } } + +object Scheduler { + def props: Props = Props[Scheduler] + + final case class Schedule(barrier: Semaphore) + final case class Execute(barrier: Semaphore) +} + +class Scheduler extends Actor with ActorLogging { + import Scheduler._ + import context.dispatcher + + def receive = { + case Schedule(barrier) => + context.system.scheduler.scheduleOnce(1.second, self, Execute(barrier)) + case Execute(barrier) => + tracedChild(barrier) + } + + @Trace + def tracedChild(barrier: Semaphore): Unit = { + activeSpan().setSpanName("scheduledOperationSpan") + barrier.release(1) + } +} diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index 687f68aa494..29ed7f88920 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -167,6 +167,8 @@ public final class TraceInstrumentationConfig { "trace.websocket.messages.separate.traces"; public static final String TRACE_WEBSOCKET_TAG_SESSION_ID = "trace.websocket.tag.session-id"; + public static final String TRACE_PEKKO_SCHEDULER_ENABLED = "trace.pekko.scheduler.enabled"; + public static final String JAX_RS_EXCEPTION_AS_ERROR_ENABLED = "trace.jax-rs.exception-as-error.enabled"; public static final String JAX_RS_ADDITIONAL_ANNOTATIONS = "trace.jax-rs.additional.annotations"; diff --git a/internal-api/src/main/java/datadog/trace/api/InstrumenterConfig.java b/internal-api/src/main/java/datadog/trace/api/InstrumenterConfig.java index d06b1a7a33f..0fda23e42e3 100644 --- a/internal-api/src/main/java/datadog/trace/api/InstrumenterConfig.java +++ b/internal-api/src/main/java/datadog/trace/api/InstrumenterConfig.java @@ -64,6 +64,7 @@ import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_EXTENSIONS_PATH; import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_METHODS; import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_OTEL_ENABLED; +import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_PEKKO_SCHEDULER_ENABLED; import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_THREAD_POOL_EXECUTORS_EXCLUDE; import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_WEBSOCKET_MESSAGES_ENABLED; import static datadog.trace.api.config.UsmConfig.USM_ENABLED; @@ -126,6 +127,7 @@ public class InstrumenterConfig { private final String httpURLConnectionClassName; private final String axisTransportClassName; private final boolean websocketTracingEnabled; + private final boolean pekkoSchedulerEnabled; private final boolean directAllocationProfilingEnabled; @@ -278,6 +280,7 @@ private InstrumenterConfig() { this.websocketTracingEnabled = configProvider.getBoolean( TRACE_WEBSOCKET_MESSAGES_ENABLED, DEFAULT_WEBSOCKET_MESSAGES_ENABLED); + this.pekkoSchedulerEnabled = configProvider.getBoolean(TRACE_PEKKO_SCHEDULER_ENABLED, false); } public boolean isCodeOriginEnabled() { @@ -505,6 +508,10 @@ public boolean isWebsocketTracingEnabled() { return websocketTracingEnabled; } + public boolean isPekkoSchedulerEnabled() { + return pekkoSchedulerEnabled; + } + /** * Check whether asynchronous result types are supported with @Trace annotation. * @@ -637,6 +644,8 @@ public String toString() { + additionalJaxRsAnnotations + ", websocketTracingEnabled=" + websocketTracingEnabled + + ", pekkoSchedulerEnabled=" + + pekkoSchedulerEnabled + '}'; } }