Skip to content

Commit 089d1cc

Browse files
authored
Configure OpenLineage if present in Spark instrumentation (#8541)
* set OL if detected Signed-off-by: Maciej Obuchowski <[email protected]> * classloader and code review fixes Signed-off-by: Maciej Obuchowski <[email protected]> --------- Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 32046a3 commit 089d1cc

File tree

16 files changed

+383
-23
lines changed

16 files changed

+383
-23
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,11 @@ public static void start(
250250
setSystemPropertyDefault(
251251
propertyNameToSystemPropertyName("integration.kafka.enabled"), "true");
252252

253+
if (Config.get().isDataJobsOpenLineageEnabled()) {
254+
setSystemPropertyDefault(
255+
propertyNameToSystemPropertyName("integration.spark-openlineage.enabled"), "true");
256+
}
257+
253258
String javaCommand = System.getProperty("sun.java.command");
254259
String dataJobsCommandPattern = Config.get().getDataJobsCommandPattern();
255260
if (!isDataJobsSupported(javaCommand, dataJobsCommandPattern)) {

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55

66
import com.google.auto.service.AutoService;
77
import datadog.trace.agent.tooling.InstrumenterModule;
8+
import datadog.trace.api.Config;
89
import net.bytebuddy.asm.Advice;
910
import org.apache.spark.SparkContext;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1013

1114
@AutoService(InstrumenterModule.class)
1215
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@@ -17,6 +20,7 @@ public String[] helperClassNames() {
1720
packageName + ".DatabricksParentContext",
1821
packageName + ".OpenlineageParentContext",
1922
packageName + ".DatadogSpark212Listener",
23+
packageName + ".PredeterminedTraceIdContext",
2024
packageName + ".RemoveEldestHashMap",
2125
packageName + ".SparkAggregatedTaskMetrics",
2226
packageName + ".SparkConfAllowList",
@@ -41,6 +45,33 @@ public void methodAdvice(MethodTransformer transformer) {
4145
public static class InjectListener {
4246
@Advice.OnMethodEnter(suppress = Throwable.class)
4347
public static void enter(@Advice.This SparkContext sparkContext) {
48+
Logger log = LoggerFactory.getLogger("Spark212InjectListener");
49+
if (Config.get().isDataJobsOpenLineageEnabled()
50+
&& AbstractDatadogSparkListener.classIsLoadable(
51+
"io.openlineage.spark.agent.OpenLineageSparkListener")
52+
&& AbstractDatadogSparkListener.classIsLoadable(
53+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
54+
if (!sparkContext.conf().contains("spark.extraListeners")) {
55+
log.debug("spark.extraListeners does not contain any listeners. Adding OpenLineage");
56+
sparkContext
57+
.conf()
58+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
59+
} else {
60+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
61+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
62+
log.debug(
63+
"spark.extraListeners does contain listeners {}. Adding OpenLineage",
64+
extraListeners);
65+
sparkContext
66+
.conf()
67+
.set(
68+
"spark.extraListeners",
69+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
70+
}
71+
}
72+
}
73+
74+
// We want to add the Datadog listener as the first listener
4475
AbstractDatadogSparkListener.listener =
4576
new DatadogSpark212Listener(
4677
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55

66
import com.google.auto.service.AutoService;
77
import datadog.trace.agent.tooling.InstrumenterModule;
8+
import datadog.trace.api.Config;
89
import net.bytebuddy.asm.Advice;
910
import org.apache.spark.SparkContext;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1013

1114
@AutoService(InstrumenterModule.class)
1215
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@@ -17,6 +20,7 @@ public String[] helperClassNames() {
1720
packageName + ".DatabricksParentContext",
1821
packageName + ".OpenlineageParentContext",
1922
packageName + ".DatadogSpark213Listener",
23+
packageName + ".PredeterminedTraceIdContext",
2024
packageName + ".RemoveEldestHashMap",
2125
packageName + ".SparkAggregatedTaskMetrics",
2226
packageName + ".SparkConfAllowList",
@@ -41,6 +45,34 @@ public void methodAdvice(MethodTransformer transformer) {
4145
public static class InjectListener {
4246
@Advice.OnMethodEnter(suppress = Throwable.class)
4347
public static void enter(@Advice.This SparkContext sparkContext) {
48+
// checking whether OpenLineage integration is enabled, available and that it supports tags
49+
Logger log = LoggerFactory.getLogger("Spark212InjectListener");
50+
if (Config.get().isDataJobsOpenLineageEnabled()
51+
&& AbstractDatadogSparkListener.classIsLoadable(
52+
"io.openlineage.spark.agent.OpenLineageSparkListener")
53+
&& AbstractDatadogSparkListener.classIsLoadable(
54+
"io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) {
55+
if (!sparkContext.conf().contains("spark.extraListeners")) {
56+
log.debug("spark.extraListeners does not contain any listeners. Adding OpenLineage");
57+
sparkContext
58+
.conf()
59+
.set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener");
60+
} else {
61+
String extraListeners = sparkContext.conf().get("spark.extraListeners");
62+
if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) {
63+
log.debug(
64+
"spark.extraListeners does contain listeners {}. Adding OpenLineage",
65+
extraListeners);
66+
sparkContext
67+
.conf()
68+
.set(
69+
"spark.extraListeners",
70+
extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener");
71+
}
72+
}
73+
}
74+
75+
// We want to add the Datadog listener as the first listener
4476
AbstractDatadogSparkListener.listener =
4577
new DatadogSpark213Listener(
4678
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());

0 commit comments

Comments
 (0)