Skip to content

Commit b4637a8

Browse files
committed
add config variable to determine whether enable ol injection
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent ccd94f3 commit b4637a8

File tree

13 files changed

+81
-50
lines changed

13 files changed

+81
-50
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ public static void start(
242242
setSystemPropertyDefault(
243243
propertyNameToSystemPropertyName("integration.kafka.enabled"), "true");
244244

245+
if (Config.get().isDataJobsOpenLineageEnabled()) {
246+
setSystemPropertyDefault(
247+
propertyNameToSystemPropertyName("integration.openlineage-spark.enabled"), "true");
248+
}
249+
245250
String javaCommand = System.getProperty("sun.java.command");
246251
String dataJobsCommandPattern = Config.get().getDataJobsCommandPattern();
247252
if (!isDataJobsSupported(javaCommand, dataJobsCommandPattern)) {

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
import com.google.auto.service.AutoService;
77
import datadog.trace.agent.tooling.InstrumenterModule;
8+
import datadog.trace.api.Config;
89
import net.bytebuddy.asm.Advice;
910
import org.apache.spark.SparkContext;
11+
import org.apache.spark.util.Utils;
1012

1113
@AutoService(InstrumenterModule.class)
1214
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@@ -44,27 +46,25 @@ public static class InjectListener {
4446
public static void enter(@Advice.This SparkContext sparkContext) {
4547
// checking whether OpenLineage integration is available and that it supports tags
4648
// Disabling this mechanism for this PR. Will be enabled with provided with Config option.
47-
// if (Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
48-
// && Utils.classIsLoadable(
49-
// "io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
50-
// if (!sparkContext.conf().contains("spark.extraListeners")) {
51-
// sparkContext
52-
// .conf()
53-
// .set("spark.extraListeners",
54-
// "io.openlineage.spark.agent.OpenLineageSparkListener");
55-
// } else {
56-
// String extraListeners = sparkContext.conf().get("spark.extraListeners");
57-
// if
58-
// (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
59-
// sparkContext
60-
// .conf()
61-
// .set(
62-
// "spark.extraListeners",
63-
// extraListeners +
64-
// ",io.openlineage.spark.agent.OpenLineageSparkListener");
65-
// }
66-
// }
67-
// }
49+
if (Config.get().isDataJobsOpenLineageEnabled()
50+
&& Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
51+
&& Utils.classIsLoadable(
52+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
53+
if (!sparkContext.conf().contains("spark.extraListeners")) {
54+
sparkContext
55+
.conf()
56+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
57+
} else {
58+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
59+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
60+
sparkContext
61+
.conf()
62+
.set(
63+
"spark.extraListeners",
64+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
65+
}
66+
}
67+
}
6868

6969
// We want to add the Datadog listener as the first listener
7070
AbstractDatadogSparkListener.listener =

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
import com.google.auto.service.AutoService;
77
import datadog.trace.agent.tooling.InstrumenterModule;
8+
import datadog.trace.api.Config;
89
import net.bytebuddy.asm.Advice;
910
import org.apache.spark.SparkContext;
11+
import org.apache.spark.util.Utils;
1012

1113
@AutoService(InstrumenterModule.class)
1214
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@@ -42,29 +44,29 @@ public void methodAdvice(MethodTransformer transformer) {
4244
public static class InjectListener {
4345
@Advice.OnMethodEnter(suppress = Throwable.class)
4446
public static void enter(@Advice.This SparkContext sparkContext) {
45-
// checking whether OpenLineage integration is available and that it supports tags
46-
// Disabling this mechanism for this PR. Will be enabled with provided with Config option.
47-
// if (Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
48-
// && Utils.classIsLoadable(
49-
// "io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
50-
// if (!sparkContext.conf().contains("spark.extraListeners")) {
51-
// sparkContext
52-
// .conf()
53-
// .set("spark.extraListeners",
54-
// "io.openlineage.spark.agent.OpenLineageSparkListener");
55-
// } else {
56-
// String extraListeners = sparkContext.conf().get("spark.extraListeners");
57-
// if
58-
// (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
59-
// sparkContext
60-
// .conf()
61-
// .set(
62-
// "spark.extraListeners",
63-
// extraListeners +
64-
// ",io.openlineage.spark.agent.OpenLineageSparkListener");
65-
// }
66-
// }
67-
// }
47+
// Checking whether OpenLineage integration is available and that it supports tags
48+
// Disabling this mechanism for this PR. Will be enabled with provided with Config
49+
// option.
50+
51+
if (Config.get().isDataJobsOpenLineageEnabled()
52+
&& Utils.classIsLoadable("io.openlineage.spark.agent.OpenLineageSparkListener")
53+
&& Utils.classIsLoadable(
54+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
55+
if (!sparkContext.conf().contains("spark.extraListeners")) {
56+
sparkContext
57+
.conf()
58+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
59+
} else {
60+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
61+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
62+
sparkContext
63+
.conf()
64+
.set(
65+
"spark.extraListeners",
66+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
67+
}
68+
}
69+
}
6870

6971
// We want to add the Datadog listener as the first listener
7072
AbstractDatadogSparkListener.listener =

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,12 @@ public void setupOpenLineage(DDTraceId traceId) {
206206
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
207207
this.applicationStart = applicationStart;
208208

209-
setupOpenLineage(
210-
OpenlineageParentContext.from(sparkConf)
211-
.map(context -> context.getTraceId())
212-
.orElse(predeterminedTraceIdContext.getTraceId()));
209+
if (Config.get().isDataJobsOpenLineageEnabled()) {
210+
setupOpenLineage(
211+
OpenlineageParentContext.from(sparkConf)
212+
.map(context -> context.getTraceId())
213+
.orElse(predeterminedTraceIdContext.getTraceId()));
214+
}
213215
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
214216
}
215217

@@ -750,6 +752,11 @@ public void onOtherEvent(SparkListenerEvent event) {
750752
}
751753

752754
private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
755+
if (!Config.get().isDataJobsOpenLineageEnabled()) {
756+
log.trace("Ignoring event {} - OpenLineage not enabled", event);
757+
return;
758+
}
759+
753760
if (isRunningOnDatabricks || isStreamingJob) {
754761
log.debug("Not emitting event when running on databricks or on streaming jobs");
755762
return;

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,11 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
117117

118118
public static class LiveListenerBusAdvice {
119119
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
120+
// If OL is disabled in tracer config but user set it up manually don't interfere
120121
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
121-
if (listener == null || listener.getClass().getCanonicalName() == null) {
122+
if (!Config.get().isDataJobsOpenLineageEnabled()
123+
|| listener == null
124+
|| listener.getClass().getCanonicalName() == null) {
122125
return false;
123126
}
124127
if (listener

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public String[] helperClassNames() {
4141

4242
@Override
4343
public boolean defaultEnabled() {
44-
return true;
44+
return false;
4545
}
4646

4747
@Override

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ abstract class AbstractSpark24SqlTest extends AgentTestRunner {
1414
void configurePreAgent() {
1515
super.configurePreAgent()
1616
injectSysConfig("dd.integration.spark.enabled", "true")
17+
injectSysConfig("dd.integration.openlineage-spark.enabled", "true")
1718
}
1819

1920
static Dataset<Row> generateSampleDataframe(SparkSession spark) {

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ abstract class AbstractSpark32SqlTest extends AgentTestRunner {
1010
void configurePreAgent() {
1111
super.configurePreAgent()
1212
injectSysConfig("dd.integration.spark.enabled", "true")
13+
injectSysConfig("dd.integration.openlineage-spark.enabled", "true")
1314
}
1415

1516
def "compute a GROUP BY sql query plan"() {

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
2525
void configurePreAgent() {
2626
super.configurePreAgent()
2727
injectSysConfig("dd.integration.spark.enabled", "true")
28+
injectSysConfig("dd.integration.openlineage-spark.enabled", "true")
2829
}
2930

3031
private SparkSession createSparkSession(String appName) {

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ abstract class AbstractSparkTest extends AgentTestRunner {
3434
void configurePreAgent() {
3535
super.configurePreAgent()
3636
injectSysConfig("dd.integration.spark.enabled", "true")
37+
injectSysConfig("dd.integration.openlineage-spark.enabled", "true")
3738
}
3839

3940
def "generate application span with child job and stages"() {

dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public final class ConfigDefaults {
221221
static final int DEFAULT_CWS_TLS_REFRESH = 5000;
222222

223223
static final boolean DEFAULT_DATA_JOBS_ENABLED = false;
224+
static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED = true;
224225

225226
static final boolean DEFAULT_DATA_STREAMS_ENABLED = false;
226227
static final int DEFAULT_DATA_STREAMS_BUCKET_DURATION = 10; // seconds

dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public final class GeneralConfig {
6969

7070
public static final String DATA_JOBS_ENABLED = "data.jobs.enabled";
7171
public static final String DATA_JOBS_COMMAND_PATTERN = "data.jobs.command.pattern";
72+
public static final String DATA_JOBS_OPENLINEAGE_ENABLED = "data.jobs.openlineage.enabled";
7273

7374
public static final String DATA_STREAMS_ENABLED = "data.streams.enabled";
7475
public static final String DATA_STREAMS_BUCKET_DURATION_SECONDS =

internal-api/src/main/java/datadog/trace/api/Config.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ public static String getHostName() {
498498

499499
private final boolean dataJobsEnabled;
500500
private final String dataJobsCommandPattern;
501+
private final boolean dataJobsOpenLineageEnabled;
501502

502503
private final boolean dataStreamsEnabled;
503504
private final float dataStreamsBucketDurationSeconds;
@@ -1827,6 +1828,9 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())
18271828
cwsTlsRefresh = configProvider.getInteger(CWS_TLS_REFRESH, DEFAULT_CWS_TLS_REFRESH);
18281829

18291830
dataJobsEnabled = configProvider.getBoolean(DATA_JOBS_ENABLED, DEFAULT_DATA_JOBS_ENABLED);
1831+
dataJobsOpenLineageEnabled =
1832+
configProvider.getBoolean(
1833+
DATA_JOBS_OPENLINEAGE_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED);
18301834
dataJobsCommandPattern = configProvider.getString(DATA_JOBS_COMMAND_PATTERN);
18311835

18321836
dataStreamsEnabled =
@@ -3587,6 +3591,10 @@ public boolean isDataJobsEnabled() {
35873591
return dataJobsEnabled;
35883592
}
35893593

3594+
public boolean isDataJobsOpenLineageEnabled() {
3595+
return dataJobsOpenLineageEnabled;
3596+
}
3597+
35903598
public String getDataJobsCommandPattern() {
35913599
return dataJobsCommandPattern;
35923600
}

0 commit comments

Comments
 (0)