Skip to content

Commit ed3e99d

Browse files
committed
stop injecting OL if it's not available
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 866a408 commit ed3e99d

File tree

9 files changed

+61
-34
lines changed

9 files changed

+61
-34
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import net.bytebuddy.asm.Advice;
99
import org.apache.spark.SparkContext;
10+
import org.apache.spark.util.Utils;
1011

1112
@AutoService(InstrumenterModule.class)
1213
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@@ -17,6 +18,7 @@ public String[] helperClassNames() {
1718
packageName + ".DatabricksParentContext",
1819
packageName + ".OpenlineageParentContext",
1920
packageName + ".DatadogSpark212Listener",
21+
packageName + ".PredeterminedTraceIdContext",
2022
packageName + ".RemoveEldestHashMap",
2123
packageName + ".SparkAggregatedTaskMetrics",
2224
packageName + ".SparkConfAllowList",
@@ -41,18 +43,23 @@ public void methodAdvice(MethodTransformer transformer) {
4143
public static class InjectListener {
4244
@Advice.OnMethodEnter(suppress = Throwable.class)
4345
public static void enter(@Advice.This SparkContext sparkContext) {
44-
if (!sparkContext.conf().contains("spark.extraListeners")) {
45-
sparkContext
46-
.conf()
47-
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
48-
} else {
49-
String extraListeners = sparkContext.conf().get("spark.extraListeners");
50-
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
46+
// checking whether OpenLineage integration is available and that it supports tags
47+
if (Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
48+
&& Utils.classIsLoadable(
49+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
50+
if (!sparkContext.conf().contains("spark.extraListeners")) {
5151
sparkContext
5252
.conf()
53-
.set(
54-
"spark.extraListeners",
55-
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
53+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
54+
} else {
55+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
56+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
57+
sparkContext
58+
.conf()
59+
.set(
60+
"spark.extraListeners",
61+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
62+
}
5663
}
5764
}
5865

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import net.bytebuddy.asm.Advice;
99
import org.apache.spark.SparkContext;
10+
import org.apache.spark.util.Utils;
1011

1112
@AutoService(InstrumenterModule.class)
1213
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@@ -17,6 +18,7 @@ public String[] helperClassNames() {
1718
packageName + ".DatabricksParentContext",
1819
packageName + ".OpenlineageParentContext",
1920
packageName + ".DatadogSpark213Listener",
21+
packageName + ".PredeterminedTraceIdContext",
2022
packageName + ".RemoveEldestHashMap",
2123
packageName + ".SparkAggregatedTaskMetrics",
2224
packageName + ".SparkConfAllowList",
@@ -41,18 +43,23 @@ public void methodAdvice(MethodTransformer transformer) {
4143
public static class InjectListener {
4244
@Advice.OnMethodEnter(suppress = Throwable.class)
4345
public static void enter(@Advice.This SparkContext sparkContext) {
44-
if (!sparkContext.conf().contains("spark.extraListeners")) {
45-
sparkContext
46-
.conf()
47-
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
48-
} else {
49-
String extraListeners = sparkContext.conf().get("spark.extraListeners");
50-
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
46+
// checking whether OpenLineage integration is available and that it supports tags
47+
if (Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
48+
&& Utils.classIsLoadable(
49+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
50+
if (!sparkContext.conf().contains("spark.extraListeners")) {
5151
sparkContext
5252
.conf()
53-
.set(
54-
"spark.extraListeners",
55-
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
53+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
54+
} else {
55+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
56+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
57+
sparkContext
58+
.conf()
59+
.set(
60+
"spark.extraListeners",
61+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
62+
}
5663
}
5764
}
5865

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
8888

8989
// This is created by constructor, and used if we're not in other known
9090
// parent context like Databricks, OpenLineage
91-
private final PredeterminedTraceIdParentContext predeterminedTraceIdParentContext;
91+
private final PredeterminedTraceIdContext predeterminedTraceIdContext;
9292

9393
private AgentSpan applicationSpan;
9494
private SparkListenerApplicationStart applicationStart;
@@ -145,9 +145,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
145145
databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
146146
databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName);
147147
sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks);
148-
predeterminedTraceIdParentContext =
149-
new PredeterminedTraceIdParentContext(
150-
Config.get().getIdGenerationStrategy().generateTraceId());
148+
predeterminedTraceIdContext =
149+
new PredeterminedTraceIdContext(Config.get().getIdGenerationStrategy().generateTraceId());
151150

152151
// If JVM exiting with System.exit(code), it bypass the code closing the application span
153152
//
@@ -167,7 +166,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
167166
}
168167

169168
public void setupOpenLineage(DDTraceId traceId) {
170-
log.error("Setting up OpenLineage tags");
169+
log.debug("Setting up OpenLineage tags");
171170
if (openLineageSparkListener != null) {
172171
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
173172
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -182,7 +181,7 @@ public void setupOpenLineage(DDTraceId traceId) {
182181
"_dd.trace_id:" + traceId.toString() + ";_dd.ol_intake.emit_spans:false");
183182
return;
184183
}
185-
log.error("No OpenLineageSparkListener!");
184+
log.debug("No OpenLineageSparkListener!");
186185
}
187186

188187
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -210,7 +209,7 @@ public synchronized void onApplicationStart(SparkListenerApplicationStart applic
210209
setupOpenLineage(
211210
OpenlineageParentContext.from(sparkConf)
212211
.map(context -> context.getTraceId())
213-
.orElse(predeterminedTraceIdParentContext.getTraceId()));
212+
.orElse(predeterminedTraceIdContext.getTraceId()));
214213
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
215214
}
216215

@@ -219,7 +218,7 @@ private void initApplicationSpanIfNotInitialized() {
219218
return;
220219
}
221220

222-
log.error("Starting tracer application span.");
221+
log.debug("Starting tracer application span.");
223222

224223
AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);
225224

@@ -242,7 +241,7 @@ private void initApplicationSpanIfNotInitialized() {
242241
if (openlineageParentContext.isPresent()) {
243242
captureOpenlineageContextIfPresent(builder, openlineageParentContext.get());
244243
} else {
245-
builder.asChildOf(predeterminedTraceIdParentContext);
244+
builder.asChildOf(predeterminedTraceIdContext);
246245
}
247246

248247
applicationSpan = builder.start();

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public String[] knownMatchingTypes() {
3232
"org.apache.spark.SparkContext",
3333
"org.apache.spark.deploy.SparkSubmit",
3434
"org.apache.spark.deploy.yarn.ApplicationMaster",
35+
"org.apache.spark.util.Utils",
3536
"org.apache.spark.util.SparkClassUtils",
3637
"org.apache.spark.scheduler.LiveListenerBus"
3738
};
@@ -125,7 +126,7 @@ public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener)
125126
.getCanonicalName()
126127
.equals("io.openlineage.spark.agent.OpenLineageSparkListener")) {
127128
LoggerFactory.getLogger(Config.class)
128-
.debug("Detected OL listener, skipping initialization");
129+
.debug("Detected OL listener, skipping adding to ListenerBus");
129130
return true;
130131
}
131132
return false;

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public String[] helperClassNames() {
2929
packageName + ".AbstractDatadogSparkListener",
3030
packageName + ".DatabricksParentContext",
3131
packageName + ".OpenlineageParentContext",
32+
packageName + ".PredeterminedTraceIdContext",
3233
packageName + ".RemoveEldestHashMap",
3334
packageName + ".SparkAggregatedTaskMetrics",
3435
packageName + ".SparkConfAllowList",
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
import datadog.trace.api.sampling.PrioritySampling;
66
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
77
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
89
import java.util.Map;
910

10-
public class PredeterminedTraceIdParentContext implements AgentSpanContext {
11+
public class PredeterminedTraceIdContext implements AgentSpanContext {
1112
private final DDTraceId traceId;
1213

13-
public PredeterminedTraceIdParentContext(DDTraceId traceId) {
14+
public PredeterminedTraceIdContext(DDTraceId traceId) {
1415
this.traceId = traceId;
1516
}
1617

@@ -26,7 +27,7 @@ public long getSpanId() {
2627

2728
@Override
2829
public AgentTraceCollector getTraceCollector() {
29-
return null;
30+
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
3031
}
3132

3233
@Override

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ import spock.lang.IgnoreIf
2525
Platform.isJ9()
2626
})
2727
abstract class AbstractSparkTest extends AgentTestRunner {
28+
@Override
29+
protected boolean isDataJobsEnabled() {
30+
return true
31+
}
2832

2933
@Override
3034
void configurePreAgent() {
@@ -53,6 +57,7 @@ abstract class AbstractSparkTest extends AgentTestRunner {
5357
resourceName "spark.application"
5458
spanType "spark"
5559
errored false
60+
assert span.context().getTraceId() != DDTraceId.ZERO
5661
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
5762
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
5863
parent()

dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L
272272
return false
273273
}
274274

275+
protected boolean isDataJobsEnabled() {
276+
return false
277+
}
278+
275279
protected long dataStreamsBucketDuration() {
276280
TimeUnit.MILLISECONDS.toNanos(50)
277281
}
@@ -464,6 +468,7 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L
464468
protected void configurePreAgent() {
465469
injectSysConfig(TracerConfig.SCOPE_ITERATION_KEEP_ALIVE, "1") // don't let iteration spans linger
466470
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, String.valueOf(isDataStreamsEnabled()))
471+
injectSysConfig(GeneralConfig.DATA_JOBS_ENABLED, String.valueOf(isDataJobsEnabled()))
467472
}
468473

469474
void setup() {

internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public static boolean validateWithSamplingPriority(int mechanism, int priority)
6969
* @return
7070
*/
7171
public static boolean canAvoidSamplingPriorityLock(int priority, int mechanism) {
72-
return !Config.get().isApmTracingEnabled() && mechanism == SamplingMechanism.APPSEC;
72+
return (!Config.get().isApmTracingEnabled() && mechanism == SamplingMechanism.APPSEC)
73+
|| (Config.get().isDataJobsEnabled() && mechanism == DATA_JOBS);
7374
}
7475

7576
private SamplingMechanism() {}

0 commit comments

Comments
 (0)