Skip to content

Commit 5cd3c14

Browse files
committed
classloader and code review fixes
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 46ebe3b commit 5cd3c14

File tree

5 files changed

+11
-45
lines changed

5 files changed

+11
-45
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,6 @@ public static class InjectListener {
4646
@Advice.OnMethodEnter(suppress = Throwable.class)
4747
public static void enter(@Advice.This SparkContext sparkContext) {
4848
Logger log = LoggerFactory.getLogger("Spark212InjectListener");
49-
log.debug(
50-
"AbstractDatadogSparkListener classloader is: ({}) {}",
51-
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
52-
AbstractDatadogSparkListener.class.getClassLoader());
53-
5449
if (Config.get().isDataJobsOpenLineageEnabled()
5550
&& AbstractDatadogSparkListener.classIsLoadable(
5651
"io.openlineage.spark.agent.OpenLineageSparkListener")

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,6 @@ public static class InjectListener {
4747
public static void enter(@Advice.This SparkContext sparkContext) {
4848
// checking whether OpenLineage integration is enabled, available and that it supports tags
4949
Logger log = LoggerFactory.getLogger("Spark212InjectListener");
50-
log.debug(
51-
"AbstractDatadogSparkListener classloader is: ({}) {}",
52-
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
53-
AbstractDatadogSparkListener.class.getClassLoader());
54-
5550
if (Config.get().isDataJobsOpenLineageEnabled()
5651
&& AbstractDatadogSparkListener.classIsLoadable(
5752
"io.openlineage.spark.agent.OpenLineageSparkListener")

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import datadog.trace.api.DDTraceId;
1313
import datadog.trace.api.sampling.PrioritySampling;
1414
import datadog.trace.api.sampling.SamplingMechanism;
15+
import datadog.trace.bootstrap.InstanceStore;
1516
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1617
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
1718
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
@@ -214,6 +215,12 @@ public void setupOpenLineage(DDTraceId traceId) {
214215
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
215216
this.applicationStart = applicationStart;
216217

218+
if (openLineageSparkListener == null) {
219+
openLineageSparkListener =
220+
InstanceStore.of(SparkListenerInterface.class).get("openLineageListener");
221+
openLineageSparkConf = InstanceStore.of(SparkConf.class).get("openLineageSparkConf");
222+
}
223+
217224
if (openLineageSparkListener != null) {
218225
setupOpenLineage(
219226
OpenlineageParentContext.from(sparkConf)

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import datadog.trace.agent.tooling.Instrumenter;
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import datadog.trace.api.Config;
12+
import datadog.trace.bootstrap.InstanceStore;
1213
import net.bytebuddy.asm.Advice;
1314
import org.apache.spark.deploy.SparkSubmitArguments;
1415
import org.apache.spark.scheduler.SparkListenerInterface;
@@ -121,15 +122,12 @@ public static class LiveListenerBusAdvice {
121122
// If OL is disabled in tracer config but user set it up manually don't interfere
122123
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
123124
Logger log = LoggerFactory.getLogger("LiveListenerBusAdvice");
124-
log.debug(
125-
"AbstractDatadogSparkListener classloader for LiveListenerBusAdvice is: ({}) {}",
126-
System.identityHashCode(AbstractDatadogSparkListener.class.getClassLoader()),
127-
AbstractDatadogSparkListener.class.getClassLoader());
128125
if (Config.get().isDataJobsOpenLineageEnabled()
129126
&& listener != null
130127
&& "io.openlineage.spark.agent.OpenLineageSparkListener"
131128
.equals(listener.getClass().getCanonicalName())) {
132129
log.debug("Detected OpenLineage listener, skipping adding it to ListenerBus");
130+
InstanceStore.of(SparkListenerInterface.class).put("openLineageListener", listener);
133131
return true;
134132
}
135133
return false;

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
import datadog.trace.agent.tooling.Instrumenter;
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import datadog.trace.api.Config;
12-
import java.lang.reflect.Field;
12+
import datadog.trace.bootstrap.InstanceStore;
1313
import net.bytebuddy.asm.Advice;
1414
import org.apache.spark.SparkConf;
15-
import org.apache.spark.scheduler.SparkListenerInterface;
1615
import org.slf4j.Logger;
1716
import org.slf4j.LoggerFactory;
1817

@@ -70,35 +69,7 @@ public static void exit(@Advice.This Object self, @Advice.FieldValue("conf") Spa
7069
"OpenLineage - Data Jobs integration disabled. Not manipulating OpenLineageSparkListener");
7170
return;
7271
}
73-
74-
log.debug("Checking for OpenLineageSparkListener");
75-
ClassLoader cl = Thread.currentThread().getContextClassLoader();
76-
if (cl.getClass().getName().contains("MutableURLClassLoader")
77-
|| cl.getClass().getName().contains("ChildFirstURLClassLoader")) {
78-
log.debug(
79-
"Detected MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class of parent classloader");
80-
try {
81-
log.debug(
82-
"Parent classloader: ({}) {}",
83-
System.identityHashCode(cl.getParent()),
84-
cl.getParent());
85-
Class clazz = cl.getParent().loadClass(AbstractDatadogSparkListener.class.getName());
86-
Field openLineageSparkListener = clazz.getDeclaredField("openLineageSparkListener");
87-
openLineageSparkListener.setAccessible(true);
88-
openLineageSparkListener.set(null, self);
89-
90-
Field openLineageSparkConf = clazz.getDeclaredField("openLineageSparkConf");
91-
openLineageSparkConf.setAccessible(true);
92-
openLineageSparkConf.set(null, conf);
93-
} catch (Throwable e) {
94-
log.info("Failed to setup OpenLineage", e);
95-
}
96-
} else {
97-
log.debug(
98-
"Detected other classloader than MutableURLClassLoader. Setting OpenLineage on AbstractDatadogSparkListener.class");
99-
AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self;
100-
AbstractDatadogSparkListener.openLineageSparkConf = conf;
101-
}
72+
InstanceStore.of(SparkConf.class).put("openLineageSparkConf", conf);
10273
}
10374
}
10475
}

0 commit comments

Comments
 (0)