Skip to content

Add span propagation for Pekko scheduled tasks #8765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package datadog.trace.instrumentation.pekko.concurrent;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;

/** Active span capturing and continuation for Pekko's async scheduled tasks. */
@AutoService(InstrumenterModule.class)
public class PekkoSchedulerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public PekkoSchedulerInstrumentation() {
super("java_concurrent", "pekko_concurrent", "pekko_scheduler");
}

@Override
protected boolean defaultEnabled() {
return InstrumenterConfig.get().isPekkoSchedulerEnabled();
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(Runnable.class.getName(), State.class.getName());
}

@Override
public String instrumentedType() {
return "org.apache.pekko.actor.LightArrayRevolverScheduler";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod()
.and(nameEndsWith("schedule"))
.and(takesArgument(0, named("scala.concurrent.ExecutionContext")))
.and(takesArgument(1, Runnable.class))
.and(takesArgument(2, named("scala.concurrent.duration.FiniteDuration"))),
getClass().getName() + "$Schedule");
}

public static final class Schedule {
@Advice.OnMethodEnter
public static void schedule(@Advice.Argument(1) Runnable runnable) {
capture(InstrumentationContext.get(Runnable.class, State.class), runnable);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.config.TraceInstrumentationConfig
import datadog.trace.bootstrap.instrumentation.api.Tags
import spock.lang.Shared

class PekkoActorTest extends AgentTestRunner {

@Shared
def pekkoTester = new PekkoActors()

@Override
void configurePreAgent() {
super.configurePreAgent()

injectSysConfig(TraceInstrumentationConfig.TRACE_PEKKO_SCHEDULER_ENABLED, "true")
}

def "pekko actor send #name #iterations"() {
setup:
def barrier = pekkoTester.block(name)
Expand Down Expand Up @@ -54,6 +63,36 @@ class PekkoActorTest extends AgentTestRunner {
"route" | "Rekko" | "How you doin'" | 10
}

def "pekko actor scheduling"() {
when:
def scheduleBarrier = pekkoTester.schedule()
scheduleBarrier.acquire()

then:
assertTraces(1) {
trace(2) {
sortSpansByStart()
span {
resourceName "PekkoActors.schedule"
operationName "schedulerSpan"
tags {
"$Tags.COMPONENT" "trace"
defaultTags()
}
}
span {
resourceName "Scheduler.tracedChild"
operationName "scheduledOperationSpan"
childOf(span(0))
tags {
"$Tags.COMPONENT" "trace"
defaultTags()
}
}
}
}
}

def "actor message handling should close leaked scopes"() {
when:
pekkoTester.leak("Leaker", "drip")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import Scheduler.Schedule

import java.util.concurrent.Semaphore
import org.apache.pekko.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import org.apache.pekko.pattern.ask
import org.apache.pekko.routing.RoundRobinPool
import org.apache.pekko.util.Timeout
import datadog.trace.api.Trace
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.{
activateSpan,
setAsyncPropagationEnabled,
activeSpan,
startSpan
}
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.{activateSpan, activeSpan, setAsyncPropagationEnabled, startSpan}

import scala.concurrent.duration._

Expand All @@ -21,6 +18,9 @@ class PekkoActors extends AutoCloseable {
system.actorOf(Forwarder.props(receiver), "forwarder")
val router: ActorRef =
system.actorOf(RoundRobinPool(5).props(Props[Receiver]()), "router")
val scheduler: ActorRef =
system.actorOf(Scheduler.props, "scheduler")

val tellGreeter: ActorRef =
system.actorOf(Greeter.props("Howdy", receiver), "tell-greeter")
val askGreeter: ActorRef =
Expand Down Expand Up @@ -72,6 +72,15 @@ class PekkoActors extends AutoCloseable {
tellGreeter ! WhoToGreet(who)
tellGreeter ! Leak(leak)
}

@Trace
def schedule(): Semaphore = {
val barrier = new Semaphore(0)
setAsyncPropagationEnabled(true)
activeSpan().setSpanName("schedulerSpan")
scheduler ! Schedule(barrier)
barrier
}
}

object PekkoActors {
Expand Down Expand Up @@ -162,3 +171,28 @@ class Forwarder(receiverActor: ActorRef) extends Actor with ActorLogging {
}
}
}

object Scheduler {
def props: Props = Props[Scheduler]

final case class Schedule(barrier: Semaphore)
final case class Execute(barrier: Semaphore)
}

class Scheduler extends Actor with ActorLogging {
import Scheduler._
import context.dispatcher

def receive = {
case Schedule(barrier) =>
context.system.scheduler.scheduleOnce(1.second, self, Execute(barrier))
case Execute(barrier) =>
tracedChild(barrier)
}

@Trace
def tracedChild(barrier: Semaphore): Unit = {
activeSpan().setSpanName("scheduledOperationSpan")
barrier.release(1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public final class TraceInstrumentationConfig {
"trace.websocket.messages.separate.traces";
public static final String TRACE_WEBSOCKET_TAG_SESSION_ID = "trace.websocket.tag.session-id";

public static final String TRACE_PEKKO_SCHEDULER_ENABLED = "trace.pekko.scheduler.enabled";

public static final String JAX_RS_EXCEPTION_AS_ERROR_ENABLED =
"trace.jax-rs.exception-as-error.enabled";
public static final String JAX_RS_ADDITIONAL_ANNOTATIONS = "trace.jax-rs.additional.annotations";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_EXTENSIONS_PATH;
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_METHODS;
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_OTEL_ENABLED;
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_PEKKO_SCHEDULER_ENABLED;
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_THREAD_POOL_EXECUTORS_EXCLUDE;
import static datadog.trace.api.config.TraceInstrumentationConfig.TRACE_WEBSOCKET_MESSAGES_ENABLED;
import static datadog.trace.api.config.UsmConfig.USM_ENABLED;
Expand Down Expand Up @@ -126,6 +127,7 @@ public class InstrumenterConfig {
private final String httpURLConnectionClassName;
private final String axisTransportClassName;
private final boolean websocketTracingEnabled;
private final boolean pekkoSchedulerEnabled;

private final boolean directAllocationProfilingEnabled;

Expand Down Expand Up @@ -278,6 +280,7 @@ private InstrumenterConfig() {
this.websocketTracingEnabled =
configProvider.getBoolean(
TRACE_WEBSOCKET_MESSAGES_ENABLED, DEFAULT_WEBSOCKET_MESSAGES_ENABLED);
this.pekkoSchedulerEnabled = configProvider.getBoolean(TRACE_PEKKO_SCHEDULER_ENABLED, false);
}

public boolean isCodeOriginEnabled() {
Expand Down Expand Up @@ -505,6 +508,10 @@ public boolean isWebsocketTracingEnabled() {
return websocketTracingEnabled;
}

public boolean isPekkoSchedulerEnabled() {
return pekkoSchedulerEnabled;
}

/**
* Check whether asynchronous result types are supported with @Trace annotation.
*
Expand Down Expand Up @@ -637,6 +644,8 @@ public String toString() {
+ additionalJaxRsAnnotations
+ ", websocketTracingEnabled="
+ websocketTracingEnabled
+ ", pekkoSchedulerEnabled="
+ pekkoSchedulerEnabled
+ '}';
}
}