Skip to content

Commit 71f16b6

Browse files
evanchoolymanuel-alvarez-alvarez
authored andcommitted
Add code origin support to kafka message listeners (#8301)
1 parent 50c7025 commit 71f16b6

File tree

11 files changed

+353
-99
lines changed

11 files changed

+353
-99
lines changed

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerTransformer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public byte[] transform(
245245

246246
private boolean skipInstrumentation(ClassLoader loader, String classFilePath) {
247247
if (definitionMatcher.isEmpty()) {
248-
log.warn("No debugger definitions present.");
248+
log.debug("No debugger definitions present.");
249249
return true;
250250
}
251251
if (classFilePath == null) {

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/instrumentation/MethodInfo.java

+6
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,10 @@ public String getSourceFileName() {
5858
public String getTypeName() {
5959
return Strings.getClassName(classNode.name);
6060
}
61+
62+
@Override
63+
public String toString() {
64+
return String.format(
65+
"MethodInfo{classNode=%s, methodNode=%s}", classNode.name, methodNode.desc);
66+
}
6167
}

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/CodeOriginProbe.java

+33
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static java.util.Arrays.asList;
77
import static java.util.Collections.singletonList;
88

9+
import com.datadog.debugger.agent.Generated;
910
import com.datadog.debugger.instrumentation.CodeOriginInstrumentor;
1011
import com.datadog.debugger.instrumentation.DiagnosticMessage;
1112
import com.datadog.debugger.instrumentation.InstrumentationResult.Status;
@@ -17,7 +18,9 @@
1718
import datadog.trace.bootstrap.debugger.ProbeLocation;
1819
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1920
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
21+
import java.util.Arrays;
2022
import java.util.List;
23+
import java.util.Objects;
2124
import org.slf4j.Logger;
2225
import org.slf4j.LoggerFactory;
2326

@@ -89,4 +92,34 @@ public void buildLocation(MethodInfo methodInfo) {
8992
}
9093
this.location = new ProbeLocation(type, method, file, lines);
9194
}
95+
96+
@Generated
97+
@Override
98+
public boolean equals(Object o) {
99+
if (o == null || getClass() != o.getClass()) return false;
100+
CodeOriginProbe that = (CodeOriginProbe) o;
101+
return Objects.equals(language, that.language)
102+
&& Objects.equals(id, that.id)
103+
&& version == that.version
104+
&& Arrays.equals(tags, that.tags)
105+
&& Objects.equals(tagMap, that.tagMap)
106+
&& Objects.equals(where, that.where)
107+
&& Objects.equals(evaluateAt, that.evaluateAt)
108+
&& entrySpanProbe == that.entrySpanProbe;
109+
}
110+
111+
@Generated
112+
@Override
113+
public int hashCode() {
114+
int result = Objects.hash(language, id, version, tagMap, where, evaluateAt, entrySpanProbe);
115+
result = 31 * result + Arrays.hashCode(tags);
116+
return result;
117+
}
118+
119+
@Override
120+
public String toString() {
121+
return String.format(
122+
"CodeOriginProbe{probeId=%s, entrySpanProbe=%s, signature=%s, where=%s, location=%s}",
123+
probeId, entrySpanProbe, signature, where, location);
124+
}
92125
}

dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcCodeOriginTest.groovy

+9-98
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
import datadog.trace.bootstrap.debugger.DebuggerContext.CodeOriginRecorder
21
import com.google.common.util.concurrent.MoreExecutors
32
import datadog.trace.agent.test.naming.VersionedNamingTestBase
4-
import datadog.trace.api.DDSpanTypes
53
import datadog.trace.bootstrap.debugger.DebuggerContext
6-
import datadog.trace.bootstrap.instrumentation.api.Tags
74
import example.GreeterGrpc
85
import example.Helloworld
96
import io.grpc.BindableService
@@ -13,17 +10,15 @@ import io.grpc.inprocess.InProcessChannelBuilder
1310
import io.grpc.inprocess.InProcessServerBuilder
1411
import io.grpc.stub.StreamObserver
1512

16-
import java.lang.reflect.Method
1713
import java.util.concurrent.CopyOnWriteArrayList
1814
import java.util.concurrent.Executors
1915
import java.util.concurrent.TimeUnit
2016
import java.util.concurrent.atomic.AtomicReference
2117

22-
import static datadog.trace.api.config.TraceInstrumentationConfig.*
18+
import static datadog.trace.agent.test.asserts.TagsAssert.assertTags
2319

24-
abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
25-
def codeOriginRecorder
2620

21+
abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
2722
@Override
2823
final String service() {
2924
return null
@@ -56,7 +51,7 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
5651
codeOriginSetup()
5752
}
5853

59-
def "test conversation #name"() {
54+
def "code origin test #name"() {
6055
setup:
6156

6257
def msgCount = serverMessageCount
@@ -155,77 +150,13 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
155150
}
156151
}.flatten().sort()
157152

158-
159-
assert codeOriginRecorder.invoked
160-
assertTraces(2) {
161-
trace((hasClientMessageSpans() ? clientMessageCount * serverMessageCount : 0) + 1) {
162-
span {
163-
operationName clientOperation()
164-
resourceName "example.Greeter/Conversation"
165-
spanType DDSpanTypes.RPC
166-
parent()
167-
errored false
168-
tags {
169-
"$Tags.COMPONENT" "grpc-client"
170-
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
171-
"$Tags.RPC_SERVICE" "example.Greeter"
172-
"status.code" "OK"
173-
"request.type" "example.Helloworld\$Response"
174-
"response.type" "example.Helloworld\$Response"
175-
peerServiceFrom(Tags.RPC_SERVICE)
176-
defaultTags()
177-
}
178-
}
179-
if (hasClientMessageSpans()) {
180-
(1..(clientMessageCount * serverMessageCount)).each {
181-
span {
182-
operationName "grpc.message"
183-
resourceName "grpc.message"
184-
spanType DDSpanTypes.RPC
185-
childOf span(0)
186-
errored false
187-
tags {
188-
"$Tags.COMPONENT" "grpc-client"
189-
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
190-
"message.type" "example.Helloworld\$Response"
191-
defaultTagsNoPeerService()
192-
}
193-
}
194-
}
195-
}
196-
}
197-
trace(clientMessageCount + 1) {
198-
span {
199-
operationName serverOperation()
200-
resourceName "example.Greeter/Conversation"
201-
spanType DDSpanTypes.RPC
202-
childOf trace(0).get(0)
203-
errored false
204-
tags {
205-
"$Tags.COMPONENT" "grpc-server"
206-
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
207-
"status.code" "OK"
208-
209-
defaultTags(true)
210-
}
211-
}
212-
clientRange.each {
213-
span {
214-
operationName "grpc.message"
215-
resourceName "grpc.message"
216-
spanType DDSpanTypes.RPC
217-
childOf span(0)
218-
errored false
219-
tags {
220-
"$Tags.COMPONENT" "grpc-server"
221-
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
222-
"message.type" "example.Helloworld\$Response"
223-
defaultTags()
224-
}
225-
}
226-
}
227-
}
153+
assert DebuggerContext.codeOriginRecorder != null
154+
def span = TEST_WRITER.flatten().find {
155+
it.operationName.toString() == "grpc.server.request"
228156
}
157+
assertTags(span, {
158+
it.codeOriginTags()
159+
}, false)
229160

230161
cleanup:
231162
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
@@ -247,26 +178,6 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
247178
clientRange = 1..clientMessageCount
248179
serverRange = 1..serverMessageCount
249180
}
250-
251-
252-
void codeOriginSetup() {
253-
injectSysConfig(CODE_ORIGIN_FOR_SPANS_ENABLED, "true", true)
254-
codeOriginRecorder = new CodeOriginRecorder() {
255-
def invoked = false
256-
@Override
257-
String captureCodeOrigin(boolean entry) {
258-
invoked = true
259-
return "done"
260-
}
261-
262-
@Override
263-
String captureCodeOrigin(Method method, boolean entry) {
264-
invoked = true
265-
return "done"
266-
}
267-
}
268-
DebuggerContext.initCodeOrigin(codeOriginRecorder)
269-
}
270181
}
271182

272183
class GrpcCodeOriginForkedTest extends GrpcCodeOriginTest {

dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ dependencies {
3838
implementation project(':dd-java-agent:instrumentation:kafka-common')
3939
main_java17Implementation project(':dd-java-agent:instrumentation:kafka-common')
4040

41+
implementation project(':dd-java-agent:instrumentation:span-origin')
42+
4143
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
4244
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0'
4345
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
5+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
6+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
7+
8+
import com.google.auto.service.AutoService;
9+
import datadog.trace.agent.tooling.Instrumenter;
10+
import datadog.trace.agent.tooling.InstrumenterModule;
11+
import datadog.trace.api.InstrumenterConfig;
12+
import net.bytebuddy.description.type.TypeDescription;
13+
import net.bytebuddy.matcher.ElementMatcher;
14+
15+
@AutoService(InstrumenterModule.class)
16+
public class MessageListenerInstrumentation extends InstrumenterModule.Tracing
17+
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {
18+
19+
public MessageListenerInstrumentation() {
20+
super("kafka", "kafka-3.8");
21+
}
22+
23+
@Override
24+
public boolean isEnabled() {
25+
return InstrumenterConfig.get().isCodeOriginEnabled() && super.isEnabled();
26+
}
27+
28+
@Override
29+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
30+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
31+
}
32+
33+
@Override
34+
public String hierarchyMarkerType() {
35+
return "org.springframework.kafka.listener.MessageListener";
36+
}
37+
38+
@Override
39+
public ElementMatcher<TypeDescription> hierarchyMatcher() {
40+
return implementsInterface(named(hierarchyMarkerType()));
41+
}
42+
43+
@Override
44+
public void methodAdvice(MethodTransformer transformer) {
45+
transformer.applyAdvice(
46+
isMethod().and(named("onMessage")),
47+
datadog.trace.instrumentation.codeorigin.EntrySpanOriginAdvice.class.getName());
48+
}
49+
}

0 commit comments

Comments
 (0)