Skip to content

Commit b5db502

Browse files
committed
Use existence of MetadataRecoveryStrategy to decide whether to apply pre-3.8 or post-3.8 kafka-clients instrumentation
One exception: IAST's KafkaDeserializerInstrumentation is compatible with all 3.x versions of kafka-clients
1 parent 4925a50 commit b5db502

15 files changed

+123
-1
lines changed

dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle

+8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
muzzle {
22
pass {
3+
name = "since-0.11"
34
group = "org.apache.kafka"
45
module = "kafka-clients"
56
versions = "[0.11.0.0,)"
67
assertInverse = true
78
}
9+
pass {
10+
name = "before-3.8"
11+
group = "org.apache.kafka"
12+
module = "kafka-clients"
13+
versions = "[0.11.0.0,3.8.0)"
14+
assertInverse = true
15+
}
816
}
917

1018
apply from: "$rootDir/gradle/java.gradle"

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/ConsumerCoordinatorInstrumentation.java

+12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
45
import static datadog.trace.core.datastreams.TagsProcessor.CONSUMER_GROUP_TAG;
56
import static datadog.trace.core.datastreams.TagsProcessor.KAFKA_CLUSTER_ID_TAG;
@@ -17,6 +18,7 @@
1718
import java.util.LinkedHashMap;
1819
import java.util.Map;
1920
import net.bytebuddy.asm.Advice;
21+
import net.bytebuddy.matcher.ElementMatcher;
2022
import org.apache.kafka.clients.Metadata;
2123
import org.apache.kafka.clients.consumer.ConsumerRecord;
2224
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -32,6 +34,16 @@ public ConsumerCoordinatorInstrumentation() {
3234
super("kafka");
3335
}
3436

37+
@Override
38+
public String muzzleDirective() {
39+
return "before-3.8";
40+
}
41+
42+
@Override
43+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
44+
return not(hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy")); // < 3.8
45+
}
46+
3547
@Override
3648
public Map<String, String> contextStore() {
3749
Map<String, String> contextStores = new HashMap<>();

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
45
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
@@ -9,6 +10,7 @@
910
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
1011
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
1112
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
13+
import static net.bytebuddy.matcher.ElementMatchers.not;
1214
import static net.bytebuddy.matcher.ElementMatchers.returns;
1315
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1416
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
@@ -24,6 +26,7 @@
2426
import java.util.List;
2527
import java.util.Map;
2628
import net.bytebuddy.asm.Advice;
29+
import net.bytebuddy.matcher.ElementMatcher;
2730
import org.apache.kafka.clients.Metadata;
2831
import org.apache.kafka.clients.consumer.ConsumerConfig;
2932
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -43,6 +46,16 @@ public KafkaConsumerInfoInstrumentation() {
4346
super("kafka");
4447
}
4548

49+
@Override
50+
public String muzzleDirective() {
51+
return "before-3.8";
52+
}
53+
54+
@Override
55+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
56+
return not(hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy")); // < 3.8
57+
}
58+
4659
@Override
4760
public Map<String, String> contextStore() {
4861
Map<String, String> contextStores = new HashMap<>();

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
45
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.CONSUMER_DECORATE;
56
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_CONSUME;
67
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
78
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
9+
import static net.bytebuddy.matcher.ElementMatchers.not;
810
import static net.bytebuddy.matcher.ElementMatchers.returns;
911
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1012
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
@@ -18,6 +20,7 @@
1820
import java.util.List;
1921
import java.util.Map;
2022
import net.bytebuddy.asm.Advice;
23+
import net.bytebuddy.matcher.ElementMatcher;
2124
import org.apache.kafka.clients.Metadata;
2225
import org.apache.kafka.clients.consumer.ConsumerRecord;
2326
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -30,6 +33,16 @@ public KafkaConsumerInstrumentation() {
3033
super("kafka");
3134
}
3235

36+
@Override
37+
public String muzzleDirective() {
38+
return "before-3.8";
39+
}
40+
41+
@Override
42+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
43+
return not(hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy")); // < 3.8
44+
}
45+
3346
@Override
3447
public Map<String, String> contextStore() {
3548
Map<String, String> contextStores = new HashMap<>(2);

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDeserializerInstrumentation.java

+5
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ public KafkaDeserializerInstrumentation() {
4444
super("kafka");
4545
}
4646

47+
@Override
48+
public String muzzleDirective() {
49+
return "since-0.11";
50+
}
51+
4752
@Override
4853
public String hierarchyMarkerType() {
4954
return DESERIALIZER_CLASS;

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
45
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
@@ -18,6 +19,7 @@
1819
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
1920
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
2021
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
22+
import static net.bytebuddy.matcher.ElementMatchers.not;
2123
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
2224

2325
import com.google.auto.service.AutoService;
@@ -33,6 +35,7 @@
3335
import java.util.LinkedHashMap;
3436
import java.util.Map;
3537
import net.bytebuddy.asm.Advice;
38+
import net.bytebuddy.matcher.ElementMatcher;
3639
import org.apache.kafka.clients.ApiVersions;
3740
import org.apache.kafka.clients.Metadata;
3841
import org.apache.kafka.clients.producer.Callback;
@@ -49,6 +52,16 @@ public KafkaProducerInstrumentation() {
4952
super("kafka");
5053
}
5154

55+
@Override
56+
public String muzzleDirective() {
57+
return "before-3.8";
58+
}
59+
60+
@Override
61+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
62+
return not(hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy")); // < 3.8
63+
}
64+
5265
@Override
5366
public String instrumentedType() {
5467
return "org.apache.kafka.clients.producer.KafkaProducer";

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java

+12
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
45
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
56
import static java.util.Collections.singletonMap;
67
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
8+
import static net.bytebuddy.matcher.ElementMatchers.not;
79
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
810

911
import com.google.auto.service.AutoService;
@@ -27,6 +29,16 @@ public MetadataInstrumentation() {
2729
super("kafka");
2830
}
2931

32+
@Override
33+
public String muzzleDirective() {
34+
return "before-3.8";
35+
}
36+
37+
@Override
38+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
39+
return not(hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy")); // < 3.8
40+
}
41+
3042
@Override
3143
public String hierarchyMarkerType() {
3244
return "org.apache.kafka.clients.Metadata";

dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ muzzle {
66
group = "org.apache.kafka"
77
module = "kafka-clients"
88
versions = "[3.8.0,)"
9-
assertInverse = false
9+
assertInverse = true
1010
}
1111
}
1212

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorInstrumentation.java

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
45
import static net.bytebuddy.matcher.ElementMatchers.*;
56

@@ -9,6 +10,7 @@
910
import datadog.trace.api.Config;
1011
import java.util.HashMap;
1112
import java.util.Map;
13+
import net.bytebuddy.matcher.ElementMatcher;
1214

1315
@AutoService(InstrumenterModule.class)
1416
public final class ConsumerCoordinatorInstrumentation extends InstrumenterModule.Tracing
@@ -23,6 +25,11 @@ public boolean isEnabled() {
2325
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
2426
}
2527

28+
@Override
29+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
30+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
31+
}
32+
2633
@Override
2734
public Map<String, String> contextStore() {
2835
Map<String, String> contextStores = new HashMap<>(2);

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField;
45
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
56
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
@@ -36,6 +37,11 @@ public boolean isEnabled() {
3637
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
3738
}
3839

40+
@Override
41+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
42+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
43+
}
44+
3945
@Override
4046
public Map<String, String> contextStore() {
4147
Map<String, String> contextStores = new HashMap<>(4);

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentation.java

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
45
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
56
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@@ -16,6 +17,7 @@
1617
import java.util.Iterator;
1718
import java.util.List;
1819
import java.util.Map;
20+
import net.bytebuddy.matcher.ElementMatcher;
1921

2022
@AutoService(InstrumenterModule.class)
2123
public final class KafkaConsumerInstrumentation extends InstrumenterModule.Tracing
@@ -30,6 +32,11 @@ public boolean isEnabled() {
3032
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
3133
}
3234

35+
@Override
36+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
37+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
38+
}
39+
3340
@Override
3441
public Map<String, String> contextStore() {
3542
Map<String, String> contextStores = new HashMap<>(2);

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
45
import static java.util.Collections.singletonMap;
56
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@@ -12,6 +13,7 @@
1213
import datadog.trace.agent.tooling.InstrumenterModule;
1314
import datadog.trace.api.Config;
1415
import java.util.Map;
16+
import net.bytebuddy.matcher.ElementMatcher;
1517

1618
@AutoService(InstrumenterModule.class)
1719
public final class KafkaProducerInstrumentation extends InstrumenterModule.Tracing
@@ -26,6 +28,11 @@ public boolean isEnabled() {
2628
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
2729
}
2830

31+
@Override
32+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
33+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
34+
}
35+
2936
@Override
3037
public String instrumentedType() {
3138
return "org.apache.kafka.clients.producer.KafkaProducer";

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/LegacyKafkaConsumerInfoInstrumentation.java

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField;
45
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
56
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
@@ -36,6 +37,11 @@ public boolean isEnabled() {
3637
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
3738
}
3839

40+
@Override
41+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
42+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
43+
}
44+
3945
@Override
4046
public Map<String, String> contextStore() {
4147
Map<String, String> contextStores = new HashMap<>(4);

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
45
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
56
import static java.util.Collections.singletonMap;
@@ -27,6 +28,11 @@ public boolean isEnabled() {
2728
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
2829
}
2930

31+
@Override
32+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
33+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
34+
}
35+
3036
@Override
3137
public String hierarchyMarkerType() {
3238
return "org.apache.kafka.clients.Metadata";

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerInstrumentation.java

+7
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
34
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
45
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
56

67
import datadog.trace.agent.tooling.Instrumenter;
78
import datadog.trace.agent.tooling.InstrumenterModule;
89
import datadog.trace.api.Config;
10+
import net.bytebuddy.matcher.ElementMatcher;
911

1012
// new - this instrumentation is completely new.
1113
// the purpose of this class is to provide us with information on consumer group and cluster ID
@@ -20,6 +22,11 @@ public boolean isEnabled() {
2022
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
2123
}
2224

25+
@Override
26+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
27+
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
28+
}
29+
2330
@Override
2431
public String instrumentedType() {
2532
return "org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker";

0 commit comments

Comments
 (0)