From ffc31bf5e3fac285512e864f4bdf627e8fc21320 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Thu, 13 Feb 2025 11:13:22 +0100 Subject: [PATCH 1/2] set OL if detected Signed-off-by: Maciej Obuchowski --- .../java/datadog/trace/bootstrap/Agent.java | 5 + .../spark/Spark212Instrumentation.java | 36 +++++ .../spark/Spark213Instrumentation.java | 37 +++++ .../spark/AbstractDatadogSparkListener.java | 139 +++++++++++++++--- .../spark/AbstractSparkInstrumentation.java | 37 ++++- .../spark/OpenLineageInstrumentation.java | 104 +++++++++++++ .../spark/PredeterminedTraceIdContext.java | 52 +++++++ .../spark/AbstractSpark24SqlTest.groovy | 1 + .../spark/AbstractSpark32SqlTest.groovy | 1 + ...bstractSparkStructuredStreamingTest.groovy | 1 + .../spark/AbstractSparkTest.groovy | 6 + .../trace/agent/test/AgentTestRunner.groovy | 5 + .../datadog/trace/api/ConfigDefaults.java | 1 + .../trace/api/config/GeneralConfig.java | 1 + .../main/java/datadog/trace/api/Config.java | 8 + .../trace/api/sampling/SamplingMechanism.java | 3 +- 16 files changed, 414 insertions(+), 23 deletions(-) create mode 100644 dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java create mode 100644 dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/PredeterminedTraceIdContext.java diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java index 0157a64dd1d..75ba303744e 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java @@ -250,6 +250,11 @@ public static void start( setSystemPropertyDefault( propertyNameToSystemPropertyName("integration.kafka.enabled"), "true"); + if (Config.get().isDataJobsOpenLineageEnabled()) { + setSystemPropertyDefault( + propertyNameToSystemPropertyName("integration.openlineage-spark.enabled"), "true"); + } + String javaCommand = System.getProperty("sun.java.command"); String dataJobsCommandPattern = Config.get().getDataJobsCommandPattern(); if (!isDataJobsSupported(javaCommand, dataJobsCommandPattern)) { diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 15e6fa5a80f..e2e9f758d4e 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -5,8 +5,11 @@ import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(InstrumenterModule.class) public class Spark212Instrumentation extends AbstractSparkInstrumentation { @@ -17,6 +20,7 @@ public String[] helperClassNames() { packageName + ".DatabricksParentContext", packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark212Listener", + packageName + ".PredeterminedTraceIdContext", packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", @@ -41,6 +45,38 @@ public void methodAdvice(MethodTransformer transformer) { public static class InjectListener { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enter(@Advice.This SparkContext sparkContext) { + Logger log = LoggerFactory.getLogger("Spark212InjectListener"); + log.debug( + "AbstractDatadogSparkListener classloader is: ({}) {}", + System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), + AbstractDatadogSparkListener.class.getClassLoader()); + + if (Config.get().isDataJobsOpenLineageEnabled() + && AbstractDatadogSparkListener.classIsLoadable( + "io.openlineage.spark.agent.OpenLineageSparkListener") + && AbstractDatadogSparkListener.classIsLoadable( + "io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) { + if (!sparkContext.conf().contains("spark.extraListeners")) { + log.debug("spark.extraListeners does not contain any listeners. Adding OpenLineage"); + sparkContext + .conf() + .set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener"); + } else { + String extraListeners = sparkContext.conf().get("spark.extraListeners"); + if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) { + log.debug( + "spark.extraListeners does contain listeners {}. Adding OpenLineage", + extraListeners); + sparkContext + .conf() + .set( + "spark.extraListeners", + extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener"); + } + } + } + + // We want to add the Datadog listener as the first listener AbstractDatadogSparkListener.listener = new DatadogSpark212Listener( sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version()); diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index 0d80eb7553c..465fd915809 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -5,8 +5,11 @@ import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(InstrumenterModule.class) public class Spark213Instrumentation extends AbstractSparkInstrumentation { @@ -17,6 +20,7 @@ public String[] helperClassNames() { packageName + ".DatabricksParentContext", packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark213Listener", + packageName + ".PredeterminedTraceIdContext", packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", @@ -41,6 +45,39 @@ public void methodAdvice(MethodTransformer transformer) { public static class InjectListener { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enter(@Advice.This SparkContext sparkContext) { + // checking whether OpenLineage integration is enabled, available and that it supports tags + Logger log = LoggerFactory.getLogger("Spark212InjectListener"); + log.debug( + "AbstractDatadogSparkListener classloader is: ({}) {}", + System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), + AbstractDatadogSparkListener.class.getClassLoader()); + + if (Config.get().isDataJobsOpenLineageEnabled() + && AbstractDatadogSparkListener.classIsLoadable( + "io.openlineage.spark.agent.OpenLineageSparkListener") + && AbstractDatadogSparkListener.classIsLoadable( + "io.openlineage.spark.agent.facets.builder.TagsRunFacetBuilder")) { + if (!sparkContext.conf().contains("spark.extraListeners")) { + log.debug("spark.extraListeners does not contain any listeners. Adding OpenLineage"); + sparkContext + .conf() + .set("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener"); + } else { + String extraListeners = sparkContext.conf().get("spark.extraListeners"); + if (!extraListeners.contains("io.openlineage.spark.agent.OpenLineageSparkListener")) { + log.debug( + "spark.extraListeners does contain listeners {}. Adding OpenLineage", + extraListeners); + sparkContext + .conf() + .set( + "spark.extraListeners", + extraListeners + ",io.openlineage.spark.agent.OpenLineageSparkListener"); + } + } + } + + // We want to add the Datadog listener as the first listener AbstractDatadogSparkListener.listener = new DatadogSpark213Listener( sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version()); diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 8da8865c982..61d4dad4607 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -36,6 +36,7 @@ import java.util.Optional; import java.util.Properties; import java.util.UUID; +import java.util.function.Consumer; import org.apache.spark.ExceptionFailure; import org.apache.spark.SparkConf; import org.apache.spark.TaskFailedReason; @@ -68,12 +69,16 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class); private static final ObjectMapper objectMapper = new ObjectMapper(); public static volatile AbstractDatadogSparkListener listener = null; + public static volatile SparkListenerInterface openLineageSparkListener = null; + public static volatile SparkConf openLineageSparkConf = null; + public static volatile boolean finishTraceOnApplicationEnd = true; public static volatile boolean isPysparkShell = false; private final int MAX_COLLECTION_SIZE = 5000; private final int MAX_ACCUMULATOR_SIZE = 50000; private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags."; + private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage"; private final SparkConf sparkConf; private final String sparkVersion; @@ -81,6 +86,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final AgentTracer.TracerAPI tracer; + // This is created by constructor, and used if we're not in other known + // parent context like Databricks, OpenLineage + private final PredeterminedTraceIdContext predeterminedTraceIdContext; + private AgentSpan applicationSpan; private SparkListenerApplicationStart applicationStart; private final HashMap streamingBatchSpans = new HashMap<>(); @@ -109,6 +118,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final Map accumulators = new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE); + private volatile boolean isStreamingJob = false; private final boolean isRunningOnDatabricks; private final String databricksClusterName; private final String databricksServiceName; @@ -135,6 +145,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null); databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName); sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks); + predeterminedTraceIdContext = + new PredeterminedTraceIdContext(Config.get().getIdGenerationStrategy().generateTraceId()); // If JVM exiting with System.exit(code), it bypass the code closing the application span // @@ -151,8 +163,33 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp finishApplication(System.currentTimeMillis(), null, 0, null); } })); + } - log.info("Created datadog spark listener: {}", this.getClass().getSimpleName()); + public void setupOpenLineage(DDTraceId traceId) { + log.debug("Setting up OpenLineage configuration"); + if (openLineageSparkListener != null) { + openLineageSparkConf.set("spark.openlineage.transport.type", "composite"); + openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true"); + openLineageSparkConf.set("spark.openlineage.transport.transports.agent.type", "http"); + openLineageSparkConf.set( + "spark.openlineage.transport.transports.agent.url", getAgentHttpUrl()); + openLineageSparkConf.set( + "spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT); + openLineageSparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip"); + openLineageSparkConf.set( + "spark.openlineage.run.tags", + "_dd.trace_id:" + + traceId.toString() + + ";_dd.ol_intake.emit_spans:false;dd.ol_service:" + + sparkServiceName); + return; + } + log.debug( + "There is no OpenLineage Spark Listener in the context. Skipping setting tags. {}", + openLineageSparkListener); + log.debug( + "There is no OpenLineage SparkConf in the context. Skipping setting tags. {}", + openLineageSparkConf); } /** Resource name of the spark job. Provide an implementation based on a specific scala version */ @@ -176,6 +213,14 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp @Override public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) { this.applicationStart = applicationStart; + + if (openLineageSparkListener != null) { + setupOpenLineage( + OpenlineageParentContext.from(sparkConf) + .map(context -> context.getTraceId()) + .orElse(predeterminedTraceIdContext.getTraceId())); + } + notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart); } private void initApplicationSpanIfNotInitialized() { @@ -183,6 +228,8 @@ private void initApplicationSpanIfNotInitialized() { return; } + log.debug("Starting tracer application span."); + AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null); if (applicationStart != null) { @@ -197,35 +244,36 @@ private void initApplicationSpanIfNotInitialized() { } captureApplicationParameters(builder); - captureOpenlineageContextIfPresent(builder); + + Optional openlineageParentContext = + OpenlineageParentContext.from(sparkConf); + // We know we're not in Databricks context + if (openlineageParentContext.isPresent()) { + captureOpenlineageContextIfPresent(builder, openlineageParentContext.get()); + } else { + builder.asChildOf(predeterminedTraceIdContext); + } applicationSpan = builder.start(); setDataJobsSamplingPriority(applicationSpan); applicationSpan.setMeasured(true); } - private void captureOpenlineageContextIfPresent(AgentTracer.SpanBuilder builder) { - Optional openlineageParentContext = - OpenlineageParentContext.from(sparkConf); + private void captureOpenlineageContextIfPresent( + AgentTracer.SpanBuilder builder, OpenlineageParentContext context) { + builder.asChildOf(context); - if (openlineageParentContext.isPresent()) { - OpenlineageParentContext context = openlineageParentContext.get(); - builder.asChildOf(context); + builder.withSpanId(context.getChildRootSpanId()); - builder.withSpanId(context.getChildRootSpanId()); + log.debug( + "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}", + context, + context.getTraceId(), + context.getChildRootSpanId()); - log.debug( - "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}", - context, - context.getTraceId(), - context.getChildRootSpanId()); - - builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); - builder.withTag("openlineage_parent_job_name", context.getParentJobName()); - builder.withTag("openlineage_parent_run_id", context.getParentRunId()); - } else { - log.debug("Openlineage context not found"); - } + builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); + builder.withTag("openlineage_parent_job_name", context.getParentJobName()); + builder.withTag("openlineage_parent_run_id", context.getParentRunId()); } @Override @@ -233,6 +281,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { log.info( "Received spark application end event, finish trace on this event: {}", finishTraceOnApplicationEnd); + notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd); if (finishTraceOnApplicationEnd) { finishApplication(applicationEnd.time(), null, 0, null); @@ -407,6 +456,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { if (sqlSpan != null) { jobSpanBuilder.asChildOf(sqlSpan.context()); } else if (batchKey != null) { + isStreamingJob = true; AgentSpan batchSpan = getOrCreateStreamingBatchSpan(batchKey, jobStart.time(), jobStart.properties()); jobSpanBuilder.asChildOf(batchSpan.context()); @@ -428,6 +478,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { stageToJob.put(stageId, jobStart.jobId()); } jobSpans.put(jobStart.jobId(), jobSpan); + notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart); } @Override @@ -458,6 +509,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { if (metrics != null) { metrics.setSpanMetrics(jobSpan); } + notifyOl(x -> openLineageSparkListener.onJobEnd(x), jobEnd); jobSpan.finish(jobEnd.time() * 1000); } @@ -626,6 +678,21 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { Properties props = stageProperties.get(stageSpanKey); sendTaskSpan(stageSpan, taskEnd, props); + + notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd); + } + + public static boolean classIsLoadable(String className) { + try { + Class.forName( + className, + false, + Optional.ofNullable(Thread.currentThread().getContextClassLoader()) + .orElse(SparkConf.class.getClassLoader())); + return true; + } catch (ClassNotFoundException e) { + return false; + } } private void sendTaskSpan( @@ -707,6 +774,25 @@ public void onOtherEvent(SparkListenerEvent event) { updateAdaptiveSQLPlan(event); } + private void notifyOl(Consumer ol, T event) { + if (!Config.get().isDataJobsOpenLineageEnabled()) { + log.debug("Ignoring event {} - OpenLineage not enabled", event); + return; + } + + if (isRunningOnDatabricks || isStreamingJob) { + log.debug("Not emitting event when running on databricks or on streaming jobs"); + return; + } + if (openLineageSparkListener != null) { + log.debug( + "Passing event `{}` to OpenLineageSparkListener", event.getClass().getCanonicalName()); + ol.accept(event); + } else { + log.debug("OpenLineageSparkListener is null"); + } + } + private static final Class adaptiveExecutionUpdateClass; private static final MethodHandle adaptiveExecutionIdMethod; private static final MethodHandle adaptiveSparkPlanMethod; @@ -755,6 +841,7 @@ private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) { private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sqlStart) { sqlPlans.put(sqlStart.executionId(), sqlStart.sparkPlanInfo()); sqlQueries.put(sqlStart.executionId(), sqlStart); + notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlStart); } private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) { @@ -767,6 +854,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) if (metrics != null) { metrics.setSpanMetrics(span); } + notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlEnd); span.finish(sqlEnd.time() * 1000); } @@ -1262,6 +1350,15 @@ private static String getDatabricksRunName(SparkConf conf) { return null; } + private static String getAgentHttpUrl() { + StringBuilder sb = + new StringBuilder("http://") + .append(Config.get().getAgentHost()) + .append(":") + .append(Config.get().getAgentPort()); + return sb.toString(); + } + @SuppressForbidden // called at most once per spark application private static String removeUuidFromEndOfString(String input) { return input.replaceAll( diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 8bfe1ae8000..5b0d60552af 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -8,8 +8,12 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; import net.bytebuddy.asm.Advice; import org.apache.spark.deploy.SparkSubmitArguments; +import org.apache.spark.scheduler.SparkListenerInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { @@ -28,7 +32,10 @@ public String[] knownMatchingTypes() { return new String[] { "org.apache.spark.SparkContext", "org.apache.spark.deploy.SparkSubmit", - "org.apache.spark.deploy.yarn.ApplicationMaster" + "org.apache.spark.deploy.yarn.ApplicationMaster", + "org.apache.spark.util.Utils", + "org.apache.spark.util.SparkClassUtils", + "org.apache.spark.scheduler.LiveListenerBus" }; } @@ -55,6 +62,14 @@ public void methodAdvice(MethodTransformer transformer) { .and(named("finish")) .and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))), AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice"); + + // LiveListenerBus class is used to manage spark listeners + transformer.applyAdvice( + isMethod() + .and(named("addToSharedQueue")) + .and(takesArgument(0, named("org.apache.spark.scheduler.SparkListenerInterface"))) + .and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))), + AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice"); } public static class PrepareSubmitEnvAdvice { @@ -100,4 +115,24 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S } } } + + public static class LiveListenerBusAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + // If OL is disabled in tracer config but user set it up manually don't interfere + public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) { + Logger log = LoggerFactory.getLogger("LiveListenerBusAdvice"); + log.debug( + "AbstractDatadogSparkListener classloader for LiveListenerBusAdvice is: ({}) {}", + System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), + AbstractDatadogSparkListener.class.getClassLoader()); + if (Config.get().isDataJobsOpenLineageEnabled() + && listener != null + && "io.openlineage.spark.agent.OpenLineageSparkListener" + .equals(listener.getClass().getCanonicalName())) { + log.debug("Detected OpenLineage listener, skipping adding it to ListenerBus"); + return true; + } + return false; + } + } } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java new file mode 100644 index 00000000000..52d17a4b5db --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java @@ -0,0 +1,104 @@ +package datadog.trace.instrumentation.spark; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; +import java.lang.reflect.Field; +import net.bytebuddy.asm.Advice; +import org.apache.spark.SparkConf; +import org.apache.spark.scheduler.SparkListenerInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@AutoService(InstrumenterModule.class) +public class OpenLineageInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + public OpenLineageInstrumentation() { + super("openlineage-spark"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".AbstractDatadogSparkListener", + packageName + ".DatabricksParentContext", + packageName + ".OpenlineageParentContext", + packageName + ".PredeterminedTraceIdContext", + packageName + ".RemoveEldestHashMap", + packageName + ".SparkAggregatedTaskMetrics", + packageName + ".SparkConfAllowList", + packageName + ".SparkSQLUtils", + packageName + ".SparkSQLUtils$SparkPlanInfoForStage", + packageName + ".SparkSQLUtils$AccumulatorWithStage", + }; + } + + @Override + public boolean defaultEnabled() { + return false; + } + + @Override + public String[] knownMatchingTypes() { + return new String[] {"io.openlineage.spark.agent.OpenLineageSparkListener"}; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // LiveListenerBus class is used when running in a YARN cluster + transformer.applyAdvice( + isConstructor() + .and(isDeclaredBy(named("io.openlineage.spark.agent.OpenLineageSparkListener"))) + .and(takesArgument(0, named("org.apache.spark.SparkConf"))), + OpenLineageInstrumentation.class.getName() + "$OpenLineageSparkListenerAdvice"); + } + + public static class OpenLineageSparkListenerAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit(@Advice.This Object self, @Advice.FieldValue("conf") SparkConf conf) + throws IllegalAccessException { + Logger log = LoggerFactory.getLogger("OpenLineageSparkListenerAdvice"); + if (!Config.get().isDataJobsOpenLineageEnabled()) { + log.debug( + "OpenLineage - Data Jobs integration disabled. Not manipulating OpenLineageSparkListener"); + return; + } + + log.debug("Checking for OpenLineageSparkListener"); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (cl.getClass().getName().contains("MutableURLClassLoader") + || cl.getClass().getName().contains("ChildFirstURLClassLoader")) { + log.debug( + "Detected MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class of parent classloader"); + try { + log.debug( + "Parent classloader: ({}) {}", + System.identityHashCode(cl.getParent()), + cl.getParent()); + Class clazz = cl.getParent().loadClass(AbstractDatadogSparkListener.class.getName()); + Field openLineageSparkListener = clazz.getDeclaredField("openLineageSparkListener"); + openLineageSparkListener.setAccessible(true); + openLineageSparkListener.set(null, self); + + Field openLineageSparkConf = clazz.getDeclaredField("openLineageSparkConf"); + openLineageSparkConf.setAccessible(true); + openLineageSparkConf.set(null, conf); + } catch (Throwable e) { + log.info("Failed to setup OpenLineage", e); + } + } else { + log.debug( + "Detected other classloader than MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class"); + AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self; + AbstractDatadogSparkListener.openLineageSparkConf = conf; + } + } + } +} diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/PredeterminedTraceIdContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/PredeterminedTraceIdContext.java new file mode 100644 index 00000000000..51164f8bbc8 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/PredeterminedTraceIdContext.java @@ -0,0 +1,52 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.api.DDTraceId; +import datadog.trace.api.datastreams.PathwayContext; +import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; +import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.util.Map; + +public class PredeterminedTraceIdContext implements AgentSpanContext { + private final DDTraceId traceId; + + public PredeterminedTraceIdContext(DDTraceId traceId) { + this.traceId = traceId; + } + + @Override + public DDTraceId getTraceId() { + return this.traceId; + } + + @Override + public long getSpanId() { + return 0; + } + + @Override + public AgentTraceCollector getTraceCollector() { + return AgentTracer.NoopAgentTraceCollector.INSTANCE; + } + + @Override + public int getSamplingPriority() { + return PrioritySampling.USER_KEEP; + } + + @Override + public Iterable> baggageItems() { + return null; + } + + @Override + public PathwayContext getPathwayContext() { + return null; + } + + @Override + public boolean isRemote() { + return false; + } +} diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy index f1634a54ce6..e839499b7fd 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy @@ -14,6 +14,7 @@ abstract class AbstractSpark24SqlTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.openlineage-spark.enabled", "true") } static Dataset generateSampleDataframe(SparkSession spark) { diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index 0f42dd58f50..b8331ffd33c 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -10,6 +10,7 @@ abstract class AbstractSpark32SqlTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.openlineage-spark.enabled", "true") } def "compute a GROUP BY sql query plan"() { diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy index 38ea89e0ca0..cace414208d 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy @@ -25,6 +25,7 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.openlineage-spark.enabled", "true") } private SparkSession createSparkSession(String appName) { diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index 36ae6ab5fe0..368ef17c276 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -25,11 +25,16 @@ import spock.lang.IgnoreIf Platform.isJ9() }) abstract class AbstractSparkTest extends AgentTestRunner { + @Override + protected boolean isDataJobsEnabled() { + return true + } @Override void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") + injectSysConfig("dd.integration.openlineage-spark.enabled", "true") } def "generate application span with child job and stages"() { @@ -53,6 +58,7 @@ abstract class AbstractSparkTest extends AgentTestRunner { resourceName "spark.application" spanType "spark" errored false + assert span.context().getTraceId() != DDTraceId.ZERO assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString() parent() diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy index a484826e84a..029024f2a35 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy @@ -272,6 +272,10 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L return false } + protected boolean isDataJobsEnabled() { + return false + } + protected long dataStreamsBucketDuration() { TimeUnit.MILLISECONDS.toNanos(50) } @@ -464,6 +468,7 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L protected void configurePreAgent() { injectSysConfig(TracerConfig.SCOPE_ITERATION_KEEP_ALIVE, "1") // don't let iteration spans linger injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, String.valueOf(isDataStreamsEnabled())) + injectSysConfig(GeneralConfig.DATA_JOBS_ENABLED, String.valueOf(isDataJobsEnabled())) } void setup() { diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 230ec4efd54..37931a666f5 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -224,6 +224,7 @@ public final class ConfigDefaults { static final int DEFAULT_CWS_TLS_REFRESH = 5000; static final boolean DEFAULT_DATA_JOBS_ENABLED = false; + static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED = false; static final boolean DEFAULT_DATA_STREAMS_ENABLED = false; static final int DEFAULT_DATA_STREAMS_BUCKET_DURATION = 10; // seconds diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index a882a097e59..e49baba90e0 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -69,6 +69,7 @@ public final class GeneralConfig { public static final String DATA_JOBS_ENABLED = "data.jobs.enabled"; public static final String DATA_JOBS_COMMAND_PATTERN = "data.jobs.command.pattern"; + public static final String DATA_JOBS_OPENLINEAGE_ENABLED = "data.jobs.openlineage.enabled"; public static final String DATA_STREAMS_ENABLED = "data.streams.enabled"; public static final String DATA_STREAMS_BUCKET_DURATION_SECONDS = diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index c2218e01ef7..01429f7f9fb 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -501,6 +501,7 @@ public static String getHostName() { private final boolean dataJobsEnabled; private final String dataJobsCommandPattern; + private final boolean dataJobsOpenLineageEnabled; private final boolean dataStreamsEnabled; private final float dataStreamsBucketDurationSeconds; @@ -1833,6 +1834,9 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) cwsTlsRefresh = configProvider.getInteger(CWS_TLS_REFRESH, DEFAULT_CWS_TLS_REFRESH); dataJobsEnabled = configProvider.getBoolean(DATA_JOBS_ENABLED, DEFAULT_DATA_JOBS_ENABLED); + dataJobsOpenLineageEnabled = + configProvider.getBoolean( + DATA_JOBS_OPENLINEAGE_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED); dataJobsCommandPattern = configProvider.getString(DATA_JOBS_COMMAND_PATTERN); dataStreamsEnabled = @@ -3605,6 +3609,10 @@ public boolean isDataJobsEnabled() { return dataJobsEnabled; } + public boolean isDataJobsOpenLineageEnabled() { + return dataJobsOpenLineageEnabled; + } + public String getDataJobsCommandPattern() { return dataJobsCommandPattern; } diff --git a/internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java b/internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java index b21855bdb3e..ca0e3038bb3 100644 --- a/internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java +++ b/internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java @@ -69,7 +69,8 @@ public static boolean validateWithSamplingPriority(int mechanism, int priority) * @return */ public static boolean canAvoidSamplingPriorityLock(int priority, int mechanism) { - return !Config.get().isApmTracingEnabled() && mechanism == SamplingMechanism.APPSEC; + return (!Config.get().isApmTracingEnabled() && mechanism == SamplingMechanism.APPSEC) + || (Config.get().isDataJobsEnabled() && mechanism == DATA_JOBS); } private SamplingMechanism() {} From 83cf0931827cc95cfca09662845337e6ed317664 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Mon, 31 Mar 2025 13:02:14 +0200 Subject: [PATCH 2/2] classloader and code review fixes Signed-off-by: Maciej Obuchowski --- .../java/datadog/trace/bootstrap/Agent.java | 2 +- .../spark/Spark212Instrumentation.java | 5 --- .../spark/Spark213Instrumentation.java | 5 --- .../spark/AbstractDatadogSparkListener.java | 18 +++++++--- .../spark/AbstractSparkInstrumentation.java | 6 ++-- .../spark/OpenLineageInstrumentation.java | 35 ++----------------- .../spark/AbstractSpark32SqlTest.groovy | 2 +- ...bstractSparkStructuredStreamingTest.groovy | 2 +- .../spark/AbstractSparkTest.groovy | 2 +- 9 files changed, 23 insertions(+), 54 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java index 75ba303744e..64e38884ef8 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java @@ -252,7 +252,7 @@ public static void start( if (Config.get().isDataJobsOpenLineageEnabled()) { setSystemPropertyDefault( - propertyNameToSystemPropertyName("integration.openlineage-spark.enabled"), "true"); + propertyNameToSystemPropertyName("integration.spark-openlineage.enabled"), "true"); } String javaCommand = System.getProperty("sun.java.command"); diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index e2e9f758d4e..1e5dba63bfb 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -46,11 +46,6 @@ public static class InjectListener { @Advice.OnMethodEnter(suppress = Throwable.class) public static void enter(@Advice.This SparkContext sparkContext) { Logger log = LoggerFactory.getLogger("Spark212InjectListener"); - log.debug( - "AbstractDatadogSparkListener classloader is: ({}) {}", - System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), - AbstractDatadogSparkListener.class.getClassLoader()); - if (Config.get().isDataJobsOpenLineageEnabled() && AbstractDatadogSparkListener.classIsLoadable( "io.openlineage.spark.agent.OpenLineageSparkListener") diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index 465fd915809..1b9f19e7178 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -47,11 +47,6 @@ public static class InjectListener { public static void enter(@Advice.This SparkContext sparkContext) { // checking whether OpenLineage integration is enabled, available and that it supports tags Logger log = LoggerFactory.getLogger("Spark212InjectListener"); - log.debug( - "AbstractDatadogSparkListener classloader is: ({}) {}", - System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), - AbstractDatadogSparkListener.class.getClassLoader()); - if (Config.get().isDataJobsOpenLineageEnabled() && AbstractDatadogSparkListener.classIsLoadable( "io.openlineage.spark.agent.OpenLineageSparkListener") diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 61d4dad4607..39dbba574f3 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -12,6 +12,7 @@ import datadog.trace.api.DDTraceId; import datadog.trace.api.sampling.PrioritySampling; import datadog.trace.api.sampling.SamplingMechanism; +import datadog.trace.bootstrap.InstanceStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; @@ -69,8 +70,6 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class); private static final ObjectMapper objectMapper = new ObjectMapper(); public static volatile AbstractDatadogSparkListener listener = null; - public static volatile SparkListenerInterface openLineageSparkListener = null; - public static volatile SparkConf openLineageSparkConf = null; public static volatile boolean finishTraceOnApplicationEnd = true; public static volatile boolean isPysparkShell = false; @@ -80,6 +79,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags."; private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage"; + public volatile SparkListenerInterface openLineageSparkListener = null; + public volatile SparkConf openLineageSparkConf = null; + private final SparkConf sparkConf; private final String sparkVersion; private final String appId; @@ -180,8 +182,10 @@ public void setupOpenLineage(DDTraceId traceId) { "spark.openlineage.run.tags", "_dd.trace_id:" + traceId.toString() - + ";_dd.ol_intake.emit_spans:false;dd.ol_service:" - + sparkServiceName); + + ";_dd.ol_intake.emit_spans:false;_dd.ol_service:" + + sparkServiceName + + ";_dd.ol_app_id:" + + appId); return; } log.debug( @@ -214,6 +218,12 @@ public void setupOpenLineage(DDTraceId traceId) { public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) { this.applicationStart = applicationStart; + if (openLineageSparkListener == null) { + openLineageSparkListener = + InstanceStore.of(SparkListenerInterface.class).get("openLineageListener"); + openLineageSparkConf = InstanceStore.of(SparkConf.class).get("openLineageSparkConf"); + } + if (openLineageSparkListener != null) { setupOpenLineage( OpenlineageParentContext.from(sparkConf) diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 5b0d60552af..102f3a65ccd 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -9,6 +9,7 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; +import datadog.trace.bootstrap.InstanceStore; import net.bytebuddy.asm.Advice; import org.apache.spark.deploy.SparkSubmitArguments; import org.apache.spark.scheduler.SparkListenerInterface; @@ -121,15 +122,12 @@ public static class LiveListenerBusAdvice { // If OL is disabled in tracer config but user set it up manually don't interfere public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) { Logger log = LoggerFactory.getLogger("LiveListenerBusAdvice"); - log.debug( - "AbstractDatadogSparkListener classloader for LiveListenerBusAdvice is: ({}) {}", - System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()), - AbstractDatadogSparkListener.class.getClassLoader()); if (Config.get().isDataJobsOpenLineageEnabled() && listener != null && "io.openlineage.spark.agent.OpenLineageSparkListener" .equals(listener.getClass().getCanonicalName())) { log.debug("Detected OpenLineage listener, skipping adding it to ListenerBus"); + InstanceStore.of(SparkListenerInterface.class).put("openLineageListener", listener); return true; } return false; diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java index 52d17a4b5db..2eedacfceae 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java @@ -9,10 +9,9 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; -import java.lang.reflect.Field; +import datadog.trace.bootstrap.InstanceStore; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkConf; -import org.apache.spark.scheduler.SparkListenerInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,7 +20,7 @@ public class OpenLineageInstrumentation extends InstrumenterModule.Tracing implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { public OpenLineageInstrumentation() { - super("openlineage-spark"); + super("spark-openlineage"); } @Override @@ -70,35 +69,7 @@ public static void exit(@Advice.This Object self, @Advice.FieldValue("conf") Spa "OpenLineage - Data Jobs integration disabled. Not manipulating OpenLineageSparkListener"); return; } - - log.debug("Checking for OpenLineageSparkListener"); - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - if (cl.getClass().getName().contains("MutableURLClassLoader") - || cl.getClass().getName().contains("ChildFirstURLClassLoader")) { - log.debug( - "Detected MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class of parent classloader"); - try { - log.debug( - "Parent classloader: ({}) {}", - System.identityHashCode(cl.getParent()), - cl.getParent()); - Class clazz = cl.getParent().loadClass(AbstractDatadogSparkListener.class.getName()); - Field openLineageSparkListener = clazz.getDeclaredField("openLineageSparkListener"); - openLineageSparkListener.setAccessible(true); - openLineageSparkListener.set(null, self); - - Field openLineageSparkConf = clazz.getDeclaredField("openLineageSparkConf"); - openLineageSparkConf.setAccessible(true); - openLineageSparkConf.set(null, conf); - } catch (Throwable e) { - log.info("Failed to setup OpenLineage", e); - } - } else { - log.debug( - "Detected other classloader than MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class"); - AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self; - AbstractDatadogSparkListener.openLineageSparkConf = conf; - } + InstanceStore.of(SparkConf.class).put("openLineageSparkConf", conf); } } } diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index b8331ffd33c..b71c1be3eda 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -10,7 +10,7 @@ abstract class AbstractSpark32SqlTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") - injectSysConfig("dd.integration.openlineage-spark.enabled", "true") + injectSysConfig("dd.integration.spark-openlineage.enabled", "true") } def "compute a GROUP BY sql query plan"() { diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy index cace414208d..725ff7881b0 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy @@ -25,7 +25,7 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") - injectSysConfig("dd.integration.openlineage-spark.enabled", "true") + injectSysConfig("dd.integration.spark-openlineage.enabled", "true") } private SparkSession createSparkSession(String appName) { diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index 368ef17c276..b891d7ebed3 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -34,7 +34,7 @@ abstract class AbstractSparkTest extends AgentTestRunner { void configurePreAgent() { super.configurePreAgent() injectSysConfig("dd.integration.spark.enabled", "true") - injectSysConfig("dd.integration.openlineage-spark.enabled", "true") + injectSysConfig("dd.integration.spark-openlineage.enabled", "true") } def "generate application span with child job and stages"() {