diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy index eb04ce38bc4..35cddc20fd1 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy @@ -305,6 +305,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "$Tags.COMPONENT" "java-kafka" "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC" if (partitioned) { "$InstrumentationTags.PARTITION" { it >= 0 } } @@ -380,6 +381,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } "$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 } + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC" if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java index fcdd3943f8f..c463cada1a0 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java @@ -2,6 +2,7 @@ import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.RECORD_QUEUE_TIME_MS; @@ -121,6 +122,7 @@ public void onConsume( span.setResourceName(CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, CONSUMER_PREFIX)); span.setTag(PARTITION, record.partition()); span.setTag(OFFSET, record.offset()); + span.setTag(MESSAGING_DESTINATION_NAME, topic); if (consumerGroup != null) { span.setTag(CONSUMER_GROUP, consumerGroup); } @@ -162,6 +164,7 @@ public void onProduce( } final String topic = record.topic() == null ? "kafka" : record.topic(); span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX)); + span.setTag(MESSAGING_DESTINATION_NAME, topic); } } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index cecf73f26d4..80d1cf70572 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1037,6 +1037,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "$Tags.COMPONENT" "java-kafka" "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC" if (partitioned) { "$InstrumentationTags.PARTITION" { it >= 0 } } @@ -1112,6 +1113,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } "$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 } + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC" if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy index da6eaf91644..30875d487e1 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy @@ -181,6 +181,7 @@ class KafkaReactorForkedTest extends AgentTestRunner { "$Tags.COMPONENT" "java-kafka" "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$KafkaClientTestBase.SHARED_TOPIC" peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) defaultTags() } @@ -211,6 +212,7 @@ class KafkaReactorForkedTest extends AgentTestRunner { "$InstrumentationTags.CONSUMER_GROUP" "sender" "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$KafkaClientTestBase.SHARED_TOPIC" defaultTags(true) } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java index 2361a441e50..0b2436d2d2e 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java @@ -2,6 +2,7 @@ import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.RECORD_QUEUE_TIME_MS; @@ -121,6 +122,7 @@ public void onConsume( span.setResourceName(CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, CONSUMER_PREFIX)); span.setTag(PARTITION, record.partition()); span.setTag(OFFSET, record.offset()); + span.setTag(MESSAGING_DESTINATION_NAME, topic); if (consumerGroup != null) { span.setTag(CONSUMER_GROUP, consumerGroup); } @@ -162,6 +164,7 @@ public void onProduce( } final String topic = record.topic() == null ? "kafka" : record.topic(); span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX)); + span.setTag(MESSAGING_DESTINATION_NAME, topic); } } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index 6476c47f84f..b5a27100cef 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -924,6 +924,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "$Tags.COMPONENT" "java-kafka" "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC" if (partitioned) { "$InstrumentationTags.PARTITION" { it >= 0 } } @@ -999,6 +1000,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } "$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 } + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC" if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy b/dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy index 54ab7a72343..095d479e2d9 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy @@ -131,6 +131,7 @@ class KafkaStreamsTest extends AgentTestRunner { tags { "$Tags.COMPONENT" "java-kafka" "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING" if ({ isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } } @@ -158,6 +159,7 @@ class KafkaStreamsTest extends AgentTestRunner { "$InstrumentationTags.PARTITION" { it >= 0 } "$InstrumentationTags.OFFSET" 0 "$InstrumentationTags.PROCESSOR_NAME" "KSTREAM-SOURCE-0000000000" + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING" "asdf" "testing" if ({ isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } @@ -180,6 +182,7 @@ class KafkaStreamsTest extends AgentTestRunner { tags { "$Tags.COMPONENT" "java-kafka" "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PROCESSED" if ({ isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } } @@ -204,6 +207,7 @@ class KafkaStreamsTest extends AgentTestRunner { "$InstrumentationTags.OFFSET" 0 "$InstrumentationTags.CONSUMER_GROUP" "sender" "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PROCESSED" "testing" 123 if ({ isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTestBase.groovy b/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTestBase.groovy index 790da12c510..1267187681b 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTestBase.groovy @@ -157,6 +157,7 @@ abstract class KafkaStreamsTestBase extends VersionedNamingTestBase { tags { "$Tags.COMPONENT" "java-kafka" "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING" if ({ isDataStreamsEnabled() }){ "$DDTags.PATHWAY_HASH" { String } } @@ -201,6 +202,7 @@ abstract class KafkaStreamsTestBase extends VersionedNamingTestBase { "$InstrumentationTags.PARTITION" { it >= 0 } "$InstrumentationTags.OFFSET" 0 "$InstrumentationTags.PROCESSOR_NAME" "KSTREAM-SOURCE-0000000000" + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING" "asdf" "testing" if ({isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } @@ -223,6 +225,7 @@ abstract class KafkaStreamsTestBase extends VersionedNamingTestBase { tags { "$Tags.COMPONENT" "java-kafka" "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PROCESSED" if ({isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } } @@ -267,6 +270,7 @@ abstract class KafkaStreamsTestBase extends VersionedNamingTestBase { "$InstrumentationTags.OFFSET" 0 "$InstrumentationTags.CONSUMER_GROUP" "sender" "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PROCESSED" "testing" 123 if ({isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java index 5c8e0405e7c..bfd71a2abcf 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java @@ -95,6 +95,7 @@ public class InstrumentationTags { public static final String MESSAGE = "message"; public static final String HANDLER_TYPE = "handler.type"; public static final String KAFKA_BOOTSTRAP_SERVERS = "messaging.kafka.bootstrap.servers"; + public static final String MESSAGING_DESTINATION_NAME = "messaging.destination.name"; public static final String QUARTZ_JOB_NAME = "quartz.job.name"; public static final String QUARTZ_JOB_GROUP = "quartz.job.group"; public static final String QUARTZ_TRIGGER_NAME = "quartz.trigger.name";