Skip to content

Add code origin support to kafka message listeners #8301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public byte[] transform(

private boolean skipInstrumentation(ClassLoader loader, String classFilePath) {
if (definitionMatcher.isEmpty()) {
log.warn("No debugger definitions present.");
log.debug("No debugger definitions present.");
return true;
}
if (classFilePath == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ public String getSourceFileName() {
public String getTypeName() {
return Strings.getClassName(classNode.name);
}

@Override
public String toString() {
return String.format(
"MethodInfo{classNode=%s, methodNode=%s}", classNode.name, methodNode.desc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,8 @@ public int hashCode() {

@Override
public String toString() {
return "CodeOriginProbe{"
+ "id='"
+ id
+ '\''
+ ", version="
+ version
+ ", tags="
+ Arrays.toString(tags)
+ ", where="
+ where
+ ", evaluateAt="
+ evaluateAt
+ ", entrySpanProbe="
+ entrySpanProbe
+ "} ";
return String.format(
"CodeOriginProbe{probeId=%s, entrySpanProbe=%s, signature=%s, where=%s, location=%s}",
probeId, entrySpanProbe, signature, where, location);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import datadog.trace.bootstrap.debugger.DebuggerContext.CodeOriginRecorder
import com.google.common.util.concurrent.MoreExecutors
import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.debugger.DebuggerContext
import datadog.trace.bootstrap.instrumentation.api.Tags
import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
Expand All @@ -13,17 +10,15 @@ import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.stub.StreamObserver

import java.lang.reflect.Method
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import static datadog.trace.api.config.TraceInstrumentationConfig.*
import static datadog.trace.agent.test.asserts.TagsAssert.assertTags

abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
def codeOriginRecorder

abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
@Override
final String service() {
return null
Expand Down Expand Up @@ -56,7 +51,7 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
codeOriginSetup()
}

def "test conversation #name"() {
def "code origin test #name"() {
setup:

def msgCount = serverMessageCount
Expand Down Expand Up @@ -155,77 +150,13 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
}
}.flatten().sort()


assert codeOriginRecorder.invoked
assertTraces(2) {
trace((hasClientMessageSpans() ? clientMessageCount * serverMessageCount : 0) + 1) {
span {
operationName clientOperation()
resourceName "example.Greeter/Conversation"
spanType DDSpanTypes.RPC
parent()
errored false
tags {
"$Tags.COMPONENT" "grpc-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.RPC_SERVICE" "example.Greeter"
"status.code" "OK"
"request.type" "example.Helloworld\$Response"
"response.type" "example.Helloworld\$Response"
peerServiceFrom(Tags.RPC_SERVICE)
defaultTags()
}
}
if (hasClientMessageSpans()) {
(1..(clientMessageCount * serverMessageCount)).each {
span {
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.COMPONENT" "grpc-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"message.type" "example.Helloworld\$Response"
defaultTagsNoPeerService()
}
}
}
}
}
trace(clientMessageCount + 1) {
span {
operationName serverOperation()
resourceName "example.Greeter/Conversation"
spanType DDSpanTypes.RPC
childOf trace(0).get(0)
errored false
tags {
"$Tags.COMPONENT" "grpc-server"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
"status.code" "OK"

defaultTags(true)
}
}
clientRange.each {
span {
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.COMPONENT" "grpc-server"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
"message.type" "example.Helloworld\$Response"
defaultTags()
}
}
}
}
assert DebuggerContext.codeOriginRecorder != null
def span = TEST_WRITER.flatten().find {
it.operationName.toString() == "grpc.server.request"
}
assertTags(span, {
it.codeOriginTags()
}, false)

cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
Expand All @@ -247,26 +178,6 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
clientRange = 1..clientMessageCount
serverRange = 1..serverMessageCount
}


void codeOriginSetup() {
injectSysConfig(CODE_ORIGIN_FOR_SPANS_ENABLED, "true", true)
codeOriginRecorder = new CodeOriginRecorder() {
def invoked = false
@Override
String captureCodeOrigin(boolean entry) {
invoked = true
return "done"
}

@Override
String captureCodeOrigin(Method method, boolean entry) {
invoked = true
return "done"
}
}
DebuggerContext.initCodeOrigin(codeOriginRecorder)
}
}

class GrpcCodeOriginForkedTest extends GrpcCodeOriginTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ dependencies {
implementation project(':dd-java-agent:instrumentation:kafka-common')
main_java17Implementation project(':dd-java-agent:instrumentation:kafka-common')

implementation project(':dd-java-agent:instrumentation:span-origin')

testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package datadog.trace.instrumentation.kafka_clients38;

import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.InstrumenterConfig;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(InstrumenterModule.class)
public class MessageListenerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {

public MessageListenerInstrumentation() {
super("kafka", "kafka-3.8");
}

@Override
public boolean isEnabled() {
return InstrumenterConfig.get().isCodeOriginEnabled() && super.isEnabled();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpbempel I added this extra check after remembering the other CO advices do this check here to short circuit out in the cases where CO is not enabled.

}

@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
}

@Override
public String hierarchyMarkerType() {
return "org.springframework.kafka.listener.MessageListener";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named(hierarchyMarkerType()));
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("onMessage")),
datadog.trace.instrumentation.codeorigin.EntrySpanOriginAdvice.class.getName());
}
}
Loading
Loading