Skip to content

Reactor: early propagate span in context when subscribing #8166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
import reactor.core.publisher.Hooks
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.util.context.Context
import spock.lang.Shared

import java.time.Duration
import java.util.concurrent.CompletableFuture

class ReactorCoreTest extends AgentTestRunner {

Expand Down Expand Up @@ -443,12 +443,41 @@ class ReactorCoreTest extends AgentTestRunner {

def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() {
when:
def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count()
def mono = Flux.range(1, 100).windowUntil { it % 10 == 0 }.count()
then:
// we are not interested into asserting a trace structure but only that the instrumentation error count is 0
assert mono.block() == 10
}

def "span in the context has to be activated when the publisher subscribes"() {
when:
// the mono is subscribed (block) when first is active.
// However we expect that the span third will have second as parent and not first
// because we set the parent explicitly in the reactor context (dd.span key)
def result = runUnderTrace("first", {
runUnderTrace("second", {
def mono = Mono.defer {
Mono.fromCompletionStage(CompletableFuture.supplyAsync {
runUnderTrace("third", {
"hello world"
})
})
}.contextWrite(Context.of("dd.span", TEST_TRACER.activeSpan()))
mono
})
.block()
})
then:
assert result == "hello world"
assertTraces(1, {
trace(3, true) {
basicSpan(it, "first")
basicSpan(it, "second", span(0))
basicSpan(it, "third", span(1))
}
})
}


@Trace(operationName = "trace-parent", resourceName = "trace-parent")
def assemblePublisherUnderTrace(def publisherSupplier) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package datadog.trace.instrumentation.reactor.core;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan;
import javax.annotation.Nullable;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;

public class ContextSpanHelper {
private static final String DD_SPAN_KEY = "dd.span";

private ContextSpanHelper() {}

@Nullable
public static AgentSpan extractSpanFromSubscriberContext(final CoreSubscriber<?> subscriber) {
if (subscriber == null) {
return null;
}
Context context = null;
try {
context = subscriber.currentContext();
} catch (Throwable ignored) {
}
if (context == null) {
return null;
}
if (context.hasKey(DD_SPAN_KEY)) {
Object maybeSpan = context.get(DD_SPAN_KEY);
if (maybeSpan instanceof WithAgentSpan) {
return ((WithAgentSpan) maybeSpan).asAgentSpan();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package datadog.trace.instrumentation.reactor.core;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasSuperType;
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractSpanFromSubscriberContext;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;

@AutoService(InstrumenterModule.class)
public class CorePublisherInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {
public CorePublisherInstrumentation() {
super("reactor-core");
}

@Override
public String hierarchyMarkerType() {
return "reactor.core.CoreSubscriber";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named("reactor.core.CorePublisher")) // from 3.1.7
.or(
hasSuperType(
namedOneOf(
"reactor.core.publisher.Mono", "reactor.core.publisher.Flux"))); // < 3.1.7
}

@Override
public Map<String, String> contextStore() {
final Map<String, String> ret = new HashMap<>();
ret.put("org.reactivestreams.Subscriber", AgentSpan.class.getName());
ret.put("org.reactivestreams.Publisher", AgentSpan.class.getName());
return ret;
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ContextSpanHelper",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
named("subscribe")
.and(not(isStatic()))
.and(takesArguments(1))
.and(takesArgument(0, named("reactor.core.CoreSubscriber"))),
getClass().getName() + "$PropagateContextSpanOnSubscribe");
}

public static class PropagateContextSpanOnSubscribe {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(
@Advice.This Publisher<?> self, @Advice.Argument(0) final CoreSubscriber<?> subscriber) {
final AgentSpan span = extractSpanFromSubscriberContext(subscriber);

if (span != null) {
// we force storing the span state linked to publisher and subscriber to the one explicitly
// present in the context so that, if PublisherInstrumentation is kicking in after this
// advice, it won't override that active span
InstrumentationContext.get(Publisher.class, AgentSpan.class).put(self, span);
InstrumentationContext.get(Subscriber.class, AgentSpan.class).put(subscriber, span);
return activateSpan(span);
}
return null;
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void after(@Advice.Enter final AgentScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractSpanFromSubscriberContext;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;

@AutoService(InstrumenterModule.class)
public class CoreSubscriberInstrumentation extends InstrumenterModule.Tracing
Expand All @@ -34,6 +33,13 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named(hierarchyMarkerType()));
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ContextSpanHelper",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
Expand All @@ -44,22 +50,9 @@ public void methodAdvice(MethodTransformer transformer) {
public static class PropagateSpanInScopeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(@Advice.This final CoreSubscriber<?> self) {
Context context = null;
try {
context = self.currentContext();
} catch (Throwable ignored) {
}
if (context == null) {
return null;
}
if (context.hasKey("dd.span")) {
Object maybeSpan = context.get("dd.span");
if (maybeSpan instanceof WithAgentSpan) {
AgentSpan span = ((WithAgentSpan) maybeSpan).asAgentSpan();
if (span != null) {
return activateSpan(span);
}
}
final AgentSpan span = extractSpanFromSubscriberContext(self);
if (span != null) {
return activateSpan(span);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import reactor.util.context.Context
import spock.lang.Shared

import java.time.Duration
import java.util.concurrent.CompletableFuture

class ReactorCoreTest extends AgentTestRunner {

Expand Down Expand Up @@ -440,6 +441,44 @@ class ReactorCoreTest extends AgentTestRunner {
})
}

def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() {
when:
def mono = Flux.range(1, 100).windowUntil { it % 10 == 0 }.count()
then:
// we are not interested into asserting a trace structure but only that the instrumentation error count is 0
assert mono.block() == 11
}


def "span in the context has to be activated when the publisher subscribes"() {
when:
// the mono is subscribed (block) when first is active.
// However we expect that the span third will have second as parent and not first
// because we set the parent explicitly in the reactor context (dd.span key)
def result = runUnderTrace("first", {
runUnderTrace("second", {
def mono = Mono.defer {
Mono.fromCompletionStage(CompletableFuture.supplyAsync {
runUnderTrace("third", {
"hello world"
})
})
}.subscriberContext(Context.of("dd.span", TEST_TRACER.activeSpan()))
mono
})
.block()
})
then:
assert result == "hello world"
assertTraces(1, {
trace(3, true) {
basicSpan(it, "first")
basicSpan(it, "second", span(0))
basicSpan(it, "third", span(1))
}
})
}

@Trace(operationName = "trace-parent", resourceName = "trace-parent")
def assemblePublisherUnderTrace(def publisherSupplier) {
def span = startSpan("publisher-parent")
Expand Down Expand Up @@ -490,14 +529,6 @@ class ReactorCoreTest extends AgentTestRunner {
span.finish()
}

def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() {
when:
def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count()
then:
// we are not interested into asserting a trace structure but only that the instrumentation error count is 0
assert mono.block() == 11
}

@Trace(operationName = "addOne", resourceName = "addOne")
def static addOneFunc(int i) {
return i + 1
Expand Down
Loading