Skip to content

Commit 55d9b0d

Browse files
committed
stop injecting OL if it's not available
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 866a408 commit 55d9b0d

File tree

7 files changed

+49
-32
lines changed

7 files changed

+49
-32
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import net.bytebuddy.asm.Advice;
99
import org.apache.spark.SparkContext;
10+
import org.apache.spark.util.Utils;
1011

1112
@AutoService(InstrumenterModule.class)
1213
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@@ -17,6 +18,7 @@ public String[] helperClassNames() {
1718
packageName + ".DatabricksParentContext",
1819
packageName + ".OpenlineageParentContext",
1920
packageName + ".DatadogSpark212Listener",
21+
packageName + ".PredeterminedTraceIdContext",
2022
packageName + ".RemoveEldestHashMap",
2123
packageName + ".SparkAggregatedTaskMetrics",
2224
packageName + ".SparkConfAllowList",
@@ -41,18 +43,23 @@ public void methodAdvice(MethodTransformer transformer) {
4143
public static class InjectListener {
4244
@Advice.OnMethodEnter(suppress = Throwable.class)
4345
public static void enter(@Advice.This SparkContext sparkContext) {
44-
if (!sparkContext.conf().contains("spark.extraListeners")) {
45-
sparkContext
46-
.conf()
47-
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
48-
} else {
49-
String extraListeners = sparkContext.conf().get("spark.extraListeners");
50-
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
46+
// checking whether OpenLineage integration is available and that it supports tags
47+
if (Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
48+
&& Utils.classIsLoadable(
49+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
50+
if (!sparkContext.conf().contains("spark.extraListeners")) {
5151
sparkContext
5252
.conf()
53-
.set(
54-
"spark.extraListeners",
55-
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
53+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
54+
} else {
55+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
56+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
57+
sparkContext
58+
.conf()
59+
.set(
60+
"spark.extraListeners",
61+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
62+
}
5663
}
5764
}
5865

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import net.bytebuddy.asm.Advice;
99
import org.apache.spark.SparkContext;
10+
import org.apache.spark.util.Utils;
1011

1112
@AutoService(InstrumenterModule.class)
1213
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@@ -17,6 +18,7 @@ public String[] helperClassNames() {
1718
packageName + ".DatabricksParentContext",
1819
packageName + ".OpenlineageParentContext",
1920
packageName + ".DatadogSpark213Listener",
21+
packageName + ".PredeterminedTraceIdContext",
2022
packageName + ".RemoveEldestHashMap",
2123
packageName + ".SparkAggregatedTaskMetrics",
2224
packageName + ".SparkConfAllowList",
@@ -41,18 +43,23 @@ public void methodAdvice(MethodTransformer transformer) {
4143
public static class InjectListener {
4244
@Advice.OnMethodEnter(suppress = Throwable.class)
4345
public static void enter(@Advice.This SparkContext sparkContext) {
44-
if (!sparkContext.conf().contains("spark.extraListeners")) {
45-
sparkContext
46-
.conf()
47-
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
48-
} else {
49-
String extraListeners = sparkContext.conf().get("spark.extraListeners");
50-
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
46+
// checking whether OpenLineage integration is available and that it supports tags
47+
if (Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
48+
&& Utils.classIsLoadable(
49+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
50+
if (!sparkContext.conf().contains("spark.extraListeners")) {
5151
sparkContext
5252
.conf()
53-
.set(
54-
"spark.extraListeners",
55-
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
53+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
54+
} else {
55+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
56+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
57+
sparkContext
58+
.conf()
59+
.set(
60+
"spark.extraListeners",
61+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
62+
}
5663
}
5764
}
5865

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
8888

8989
// This is created by constructor, and used if we're not in other known
9090
// parent context like Databricks, OpenLineage
91-
private final PredeterminedTraceIdParentContext predeterminedTraceIdParentContext;
91+
private final PredeterminedTraceIdContext predeterminedTraceIdContext;
9292

9393
private AgentSpan applicationSpan;
9494
private SparkListenerApplicationStart applicationStart;
@@ -145,9 +145,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
145145
databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
146146
databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName);
147147
sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks);
148-
predeterminedTraceIdParentContext =
149-
new PredeterminedTraceIdParentContext(
150-
Config.get().getIdGenerationStrategy().generateTraceId());
148+
predeterminedTraceIdContext =
149+
new PredeterminedTraceIdContext(Config.get().getIdGenerationStrategy().generateTraceId());
151150

152151
// If JVM exiting with System.exit(code), it bypass the code closing the application span
153152
//
@@ -167,7 +166,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
167166
}
168167

169168
public void setupOpenLineage(DDTraceId traceId) {
170-
log.error("Setting up OpenLineage tags");
169+
log.debug("Setting up OpenLineage tags");
171170
if (openLineageSparkListener != null) {
172171
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
173172
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -182,7 +181,7 @@ public void setupOpenLineage(DDTraceId traceId) {
182181
"_dd.trace_id:" + traceId.toString() + ";_dd.ol_intake.emit_spans:false");
183182
return;
184183
}
185-
log.error("No OpenLineageSparkListener!");
184+
log.debug("No OpenLineageSparkListener!");
186185
}
187186

188187
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -210,7 +209,7 @@ public synchronized void onApplicationStart(SparkListenerApplicationStart applic
210209
setupOpenLineage(
211210
OpenlineageParentContext.from(sparkConf)
212211
.map(context -> context.getTraceId())
213-
.orElse(predeterminedTraceIdParentContext.getTraceId()));
212+
.orElse(predeterminedTraceIdContext.getTraceId()));
214213
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
215214
}
216215

@@ -219,7 +218,7 @@ private void initApplicationSpanIfNotInitialized() {
219218
return;
220219
}
221220

222-
log.error("Starting tracer application span.");
221+
log.debug("Starting tracer application span.");
223222

224223
AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);
225224

@@ -242,7 +241,7 @@ private void initApplicationSpanIfNotInitialized() {
242241
if (openlineageParentContext.isPresent()) {
243242
captureOpenlineageContextIfPresent(builder, openlineageParentContext.get());
244243
} else {
245-
builder.asChildOf(predeterminedTraceIdParentContext);
244+
builder.asChildOf(predeterminedTraceIdContext);
246245
}
247246

248247
applicationSpan = builder.start();

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public String[] knownMatchingTypes() {
3232
"org.apache.spark.SparkContext",
3333
"org.apache.spark.deploy.SparkSubmit",
3434
"org.apache.spark.deploy.yarn.ApplicationMaster",
35+
"org.apache.spark.util.Utils",
3536
"org.apache.spark.util.SparkClassUtils",
3637
"org.apache.spark.scheduler.LiveListenerBus"
3738
};

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public String[] helperClassNames() {
2929
packageName + ".AbstractDatadogSparkListener",
3030
packageName + ".DatabricksParentContext",
3131
packageName + ".OpenlineageParentContext",
32+
packageName + ".PredeterminedTraceIdContext",
3233
packageName + ".RemoveEldestHashMap",
3334
packageName + ".SparkAggregatedTaskMetrics",
3435
packageName + ".SparkConfAllowList",
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
import datadog.trace.api.sampling.PrioritySampling;
66
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
77
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
89
import java.util.Map;
910

10-
public class PredeterminedTraceIdParentContext implements AgentSpanContext {
11+
public class PredeterminedTraceIdContext implements AgentSpanContext {
1112
private final DDTraceId traceId;
1213

13-
public PredeterminedTraceIdParentContext(DDTraceId traceId) {
14+
public PredeterminedTraceIdContext(DDTraceId traceId) {
1415
this.traceId = traceId;
1516
}
1617

@@ -26,7 +27,7 @@ public long getSpanId() {
2627

2728
@Override
2829
public AgentTraceCollector getTraceCollector() {
29-
return null;
30+
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
3031
}
3132

3233
@Override

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ abstract class AbstractSparkTest extends AgentTestRunner {
5353
resourceName "spark.application"
5454
spanType "spark"
5555
errored false
56+
assert span.context().getTraceId() != DDTraceId.ZERO
5657
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
5758
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
5859
parent()

0 commit comments

Comments
 (0)