Skip to content

Commit 3fd5db0

Browse files
Add messaging.destination.name tag to kafka integrations (#8366)
* add messaging.destination.name tag to kafka integrations * add missing import --------- Co-authored-by: Andrea Marziali <[email protected]>
1 parent 1c9548c commit 3fd5db0

File tree

9 files changed

+23
-0
lines changed

9 files changed

+23
-0
lines changed

dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy

+2
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
305305
"$Tags.COMPONENT" "java-kafka"
306306
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
307307
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
308+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"
308309
if (partitioned) {
309310
"$InstrumentationTags.PARTITION" { it >= 0 }
310311
}
@@ -380,6 +381,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
380381
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
381382
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
382383
"$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 }
384+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"
383385
if (tombstone) {
384386
"$InstrumentationTags.TOMBSTONE" true
385387
}

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP;
44
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME;
56
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
67
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
78
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.RECORD_QUEUE_TIME_MS;
@@ -121,6 +122,7 @@ public void onConsume(
121122
span.setResourceName(CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, CONSUMER_PREFIX));
122123
span.setTag(PARTITION, record.partition());
123124
span.setTag(OFFSET, record.offset());
125+
span.setTag(MESSAGING_DESTINATION_NAME, topic);
124126
if (consumerGroup != null) {
125127
span.setTag(CONSUMER_GROUP, consumerGroup);
126128
}
@@ -162,6 +164,7 @@ public void onProduce(
162164
}
163165
final String topic = record.topic() == null ? "kafka" : record.topic();
164166
span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX));
167+
span.setTag(MESSAGING_DESTINATION_NAME, topic);
165168
}
166169
}
167170
}

dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

+2
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10371037
"$Tags.COMPONENT" "java-kafka"
10381038
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
10391039
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
1040+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"
10401041
if (partitioned) {
10411042
"$InstrumentationTags.PARTITION" { it >= 0 }
10421043
}
@@ -1112,6 +1113,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
11121113
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
11131114
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
11141115
"$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 }
1116+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"
11151117
if (tombstone) {
11161118
"$InstrumentationTags.TOMBSTONE" true
11171119
}

dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy

+2
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ class KafkaReactorForkedTest extends AgentTestRunner {
181181
"$Tags.COMPONENT" "java-kafka"
182182
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
183183
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
184+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$KafkaClientTestBase.SHARED_TOPIC"
184185
peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
185186
defaultTags()
186187
}
@@ -211,6 +212,7 @@ class KafkaReactorForkedTest extends AgentTestRunner {
211212
"$InstrumentationTags.CONSUMER_GROUP" "sender"
212213
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
213214
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
215+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$KafkaClientTestBase.SHARED_TOPIC"
214216
defaultTags(true)
215217
}
216218
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP;
44
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME;
56
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
67
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
78
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.RECORD_QUEUE_TIME_MS;
@@ -121,6 +122,7 @@ public void onConsume(
121122
span.setResourceName(CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, CONSUMER_PREFIX));
122123
span.setTag(PARTITION, record.partition());
123124
span.setTag(OFFSET, record.offset());
125+
span.setTag(MESSAGING_DESTINATION_NAME, topic);
124126
if (consumerGroup != null) {
125127
span.setTag(CONSUMER_GROUP, consumerGroup);
126128
}
@@ -162,6 +164,7 @@ public void onProduce(
162164
}
163165
final String topic = record.topic() == null ? "kafka" : record.topic();
164166
span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX));
167+
span.setTag(MESSAGING_DESTINATION_NAME, topic);
165168
}
166169
}
167170
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy

+2
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
924924
"$Tags.COMPONENT" "java-kafka"
925925
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
926926
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
927+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"
927928
if (partitioned) {
928929
"$InstrumentationTags.PARTITION" { it >= 0 }
929930
}
@@ -999,6 +1000,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
9991000
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
10001001
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
10011002
"$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 }
1003+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"
10021004
if (tombstone) {
10031005
"$InstrumentationTags.TOMBSTONE" true
10041006
}

dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy

+4
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class KafkaStreamsTest extends AgentTestRunner {
131131
tags {
132132
"$Tags.COMPONENT" "java-kafka"
133133
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
134+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING"
134135
if ({ isDataStreamsEnabled()}) {
135136
"$DDTags.PATHWAY_HASH" { String }
136137
}
@@ -158,6 +159,7 @@ class KafkaStreamsTest extends AgentTestRunner {
158159
"$InstrumentationTags.PARTITION" { it >= 0 }
159160
"$InstrumentationTags.OFFSET" 0
160161
"$InstrumentationTags.PROCESSOR_NAME" "KSTREAM-SOURCE-0000000000"
162+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING"
161163
"asdf" "testing"
162164
if ({ isDataStreamsEnabled()}) {
163165
"$DDTags.PATHWAY_HASH" { String }
@@ -180,6 +182,7 @@ class KafkaStreamsTest extends AgentTestRunner {
180182
tags {
181183
"$Tags.COMPONENT" "java-kafka"
182184
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
185+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PROCESSED"
183186
if ({ isDataStreamsEnabled()}) {
184187
"$DDTags.PATHWAY_HASH" { String }
185188
}
@@ -204,6 +207,7 @@ class KafkaStreamsTest extends AgentTestRunner {
204207
"$InstrumentationTags.OFFSET" 0
205208
"$InstrumentationTags.CONSUMER_GROUP" "sender"
206209
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
210+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PROCESSED"
207211
"testing" 123
208212
if ({ isDataStreamsEnabled()}) {
209213
"$DDTags.PATHWAY_HASH" { String }

dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTestBase.groovy

+4
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ abstract class KafkaStreamsTestBase extends VersionedNamingTestBase {
157157
tags {
158158
"$Tags.COMPONENT" "java-kafka"
159159
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
160+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING"
160161
if ({ isDataStreamsEnabled() }){
161162
"$DDTags.PATHWAY_HASH" { String }
162163
}
@@ -201,6 +202,7 @@ abstract class KafkaStreamsTestBase extends VersionedNamingTestBase {
201202
"$InstrumentationTags.PARTITION" { it >= 0 }
202203
"$InstrumentationTags.OFFSET" 0
203204
"$InstrumentationTags.PROCESSOR_NAME" "KSTREAM-SOURCE-0000000000"
205+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING"
204206
"asdf" "testing"
205207
if ({isDataStreamsEnabled()}) {
206208
"$DDTags.PATHWAY_HASH" { String }
@@ -223,6 +225,7 @@ abstract class KafkaStreamsTestBase extends VersionedNamingTestBase {
223225
tags {
224226
"$Tags.COMPONENT" "java-kafka"
225227
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
228+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PROCESSED"
226229
if ({isDataStreamsEnabled()}) {
227230
"$DDTags.PATHWAY_HASH" { String }
228231
}
@@ -267,6 +270,7 @@ abstract class KafkaStreamsTestBase extends VersionedNamingTestBase {
267270
"$InstrumentationTags.OFFSET" 0
268271
"$InstrumentationTags.CONSUMER_GROUP" "sender"
269272
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
273+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PROCESSED"
270274
"testing" 123
271275
if ({isDataStreamsEnabled()}) {
272276
"$DDTags.PATHWAY_HASH" { String }

internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public class InstrumentationTags {
9595
public static final String MESSAGE = "message";
9696
public static final String HANDLER_TYPE = "handler.type";
9797
public static final String KAFKA_BOOTSTRAP_SERVERS = "messaging.kafka.bootstrap.servers";
98+
public static final String MESSAGING_DESTINATION_NAME = "messaging.destination.name";
9899
public static final String QUARTZ_JOB_NAME = "quartz.job.name";
99100
public static final String QUARTZ_JOB_GROUP = "quartz.job.group";
100101
public static final String QUARTZ_TRIGGER_NAME = "quartz.trigger.name";

0 commit comments

Comments
 (0)