Skip to content

Commit 3e097b6

Browse files
authored
Add code origin support to kafka message listeners (#8301)
1 parent f34e5f4 commit 3e097b6

File tree

11 files changed

+323
-114
lines changed

11 files changed

+323
-114
lines changed

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerTransformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ public byte[] transform(
251251

252252
private boolean skipInstrumentation(ClassLoader loader, String classFilePath) {
253253
if (definitionMatcher.isEmpty()) {
254-
log.warn("No debugger definitions present.");
254+
log.debug("No debugger definitions present.");
255255
return true;
256256
}
257257
if (classFilePath == null) {

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/instrumentation/MethodInfo.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,10 @@ public String getSourceFileName() {
5858
public String getTypeName() {
5959
return Strings.getClassName(classNode.name);
6060
}
61+
62+
@Override
63+
public String toString() {
64+
return String.format(
65+
"MethodInfo{classNode=%s, methodNode=%s}", classNode.name, methodNode.desc);
66+
}
6167
}

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/CodeOriginProbe.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -118,20 +118,8 @@ public int hashCode() {
118118

119119
@Override
120120
public String toString() {
121-
return "CodeOriginProbe{"
122-
+ "id='"
123-
+ id
124-
+ '\''
125-
+ ", version="
126-
+ version
127-
+ ", tags="
128-
+ Arrays.toString(tags)
129-
+ ", where="
130-
+ where
131-
+ ", evaluateAt="
132-
+ evaluateAt
133-
+ ", entrySpanProbe="
134-
+ entrySpanProbe
135-
+ "} ";
121+
return String.format(
122+
"CodeOriginProbe{probeId=%s, entrySpanProbe=%s, signature=%s, where=%s, location=%s}",
123+
probeId, entrySpanProbe, signature, where, location);
136124
}
137125
}

dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcCodeOriginTest.groovy

Lines changed: 9 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
import datadog.trace.bootstrap.debugger.DebuggerContext.CodeOriginRecorder
21
import com.google.common.util.concurrent.MoreExecutors
32
import datadog.trace.agent.test.naming.VersionedNamingTestBase
4-
import datadog.trace.api.DDSpanTypes
53
import datadog.trace.bootstrap.debugger.DebuggerContext
6-
import datadog.trace.bootstrap.instrumentation.api.Tags
74
import example.GreeterGrpc
85
import example.Helloworld
96
import io.grpc.BindableService
@@ -13,17 +10,15 @@ import io.grpc.inprocess.InProcessChannelBuilder
1310
import io.grpc.inprocess.InProcessServerBuilder
1411
import io.grpc.stub.StreamObserver
1512

16-
import java.lang.reflect.Method
1713
import java.util.concurrent.CopyOnWriteArrayList
1814
import java.util.concurrent.Executors
1915
import java.util.concurrent.TimeUnit
2016
import java.util.concurrent.atomic.AtomicReference
2117

22-
import static datadog.trace.api.config.TraceInstrumentationConfig.*
18+
import static datadog.trace.agent.test.asserts.TagsAssert.assertTags
2319

24-
abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
25-
def codeOriginRecorder
2620

21+
abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
2722
@Override
2823
final String service() {
2924
return null
@@ -56,7 +51,7 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
5651
codeOriginSetup()
5752
}
5853

59-
def "test conversation #name"() {
54+
def "code origin test #name"() {
6055
setup:
6156

6257
def msgCount = serverMessageCount
@@ -155,77 +150,13 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
155150
}
156151
}.flatten().sort()
157152

158-
159-
assert codeOriginRecorder.invoked
160-
assertTraces(2) {
161-
trace((hasClientMessageSpans() ? clientMessageCount * serverMessageCount : 0) + 1) {
162-
span {
163-
operationName clientOperation()
164-
resourceName "example.Greeter/Conversation"
165-
spanType DDSpanTypes.RPC
166-
parent()
167-
errored false
168-
tags {
169-
"$Tags.COMPONENT" "grpc-client"
170-
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
171-
"$Tags.RPC_SERVICE" "example.Greeter"
172-
"status.code" "OK"
173-
"request.type" "example.Helloworld\$Response"
174-
"response.type" "example.Helloworld\$Response"
175-
peerServiceFrom(Tags.RPC_SERVICE)
176-
defaultTags()
177-
}
178-
}
179-
if (hasClientMessageSpans()) {
180-
(1..(clientMessageCount * serverMessageCount)).each {
181-
span {
182-
operationName "grpc.message"
183-
resourceName "grpc.message"
184-
spanType DDSpanTypes.RPC
185-
childOf span(0)
186-
errored false
187-
tags {
188-
"$Tags.COMPONENT" "grpc-client"
189-
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
190-
"message.type" "example.Helloworld\$Response"
191-
defaultTagsNoPeerService()
192-
}
193-
}
194-
}
195-
}
196-
}
197-
trace(clientMessageCount + 1) {
198-
span {
199-
operationName serverOperation()
200-
resourceName "example.Greeter/Conversation"
201-
spanType DDSpanTypes.RPC
202-
childOf trace(0).get(0)
203-
errored false
204-
tags {
205-
"$Tags.COMPONENT" "grpc-server"
206-
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
207-
"status.code" "OK"
208-
209-
defaultTags(true)
210-
}
211-
}
212-
clientRange.each {
213-
span {
214-
operationName "grpc.message"
215-
resourceName "grpc.message"
216-
spanType DDSpanTypes.RPC
217-
childOf span(0)
218-
errored false
219-
tags {
220-
"$Tags.COMPONENT" "grpc-server"
221-
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
222-
"message.type" "example.Helloworld\$Response"
223-
defaultTags()
224-
}
225-
}
226-
}
227-
}
153+
assert DebuggerContext.codeOriginRecorder != null
154+
def span = TEST_WRITER.flatten().find {
155+
it.operationName.toString() == "grpc.server.request"
228156
}
157+
assertTags(span, {
158+
it.codeOriginTags()
159+
}, false)
229160

230161
cleanup:
231162
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
@@ -247,26 +178,6 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
247178
clientRange = 1..clientMessageCount
248179
serverRange = 1..serverMessageCount
249180
}
250-
251-
252-
void codeOriginSetup() {
253-
injectSysConfig(CODE_ORIGIN_FOR_SPANS_ENABLED, "true", true)
254-
codeOriginRecorder = new CodeOriginRecorder() {
255-
def invoked = false
256-
@Override
257-
String captureCodeOrigin(boolean entry) {
258-
invoked = true
259-
return "done"
260-
}
261-
262-
@Override
263-
String captureCodeOrigin(Method method, boolean entry) {
264-
invoked = true
265-
return "done"
266-
}
267-
}
268-
DebuggerContext.initCodeOrigin(codeOriginRecorder)
269-
}
270181
}
271182

272183
class GrpcCodeOriginForkedTest extends GrpcCodeOriginTest {

dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ dependencies {
3838
implementation project(':dd-java-agent:instrumentation:kafka-common')
3939
main_java17Implementation project(':dd-java-agent:instrumentation:kafka-common')
4040

41+
implementation project(':dd-java-agent:instrumentation:span-origin')
42+
4143
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
4244
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0'
4345
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
5+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
6+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
7+
8+
import com.google.auto.service.AutoService;
9+
import datadog.trace.agent.tooling.Instrumenter;
10+
import datadog.trace.agent.tooling.InstrumenterModule;
11+
import datadog.trace.api.InstrumenterConfig;
12+
import net.bytebuddy.description.type.TypeDescription;
13+
import net.bytebuddy.matcher.ElementMatcher;
14+
15+
@AutoService(InstrumenterModule.class)
16+
public class MessageListenerInstrumentation extends InstrumenterModule.Tracing
17+
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {
18+
19+
public MessageListenerInstrumentation() {
20+
super("kafka", "kafka-3.8");
21+
}
22+
23+
@Override
24+
public boolean isEnabled() {
25+
return InstrumenterConfig.get().isCodeOriginEnabled() && super.isEnabled();
26+
}
27+
28+
@Override
29+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
30+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
31+
}
32+
33+
@Override
34+
public String hierarchyMarkerType() {
35+
return "org.springframework.kafka.listener.MessageListener";
36+
}
37+
38+
@Override
39+
public ElementMatcher<TypeDescription> hierarchyMatcher() {
40+
return implementsInterface(named(hierarchyMarkerType()));
41+
}
42+
43+
@Override
44+
public void methodAdvice(MethodTransformer transformer) {
45+
transformer.applyAdvice(
46+
isMethod().and(named("onMessage")),
47+
datadog.trace.instrumentation.codeorigin.EntrySpanOriginAdvice.class.getName());
48+
}
49+
}

0 commit comments

Comments
 (0)