Skip to content

Commit d48bb91

Browse files
committed
set OL if detected
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent a39a27f commit d48bb91

File tree

2 files changed

+152
-2
lines changed

2 files changed

+152
-2
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@
3333
import java.util.LinkedHashMap;
3434
import java.util.List;
3535
import java.util.Map;
36+
import java.util.Objects;
3637
import java.util.Optional;
3738
import java.util.Properties;
3839
import java.util.UUID;
40+
import java.util.function.Consumer;
41+
import java.util.stream.Collectors;
3942
import org.apache.spark.ExceptionFailure;
4043
import org.apache.spark.SparkConf;
4144
import org.apache.spark.TaskFailedReason;
@@ -74,6 +77,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
7477
private final int MAX_COLLECTION_SIZE = 5000;
7578
private final int MAX_ACCUMULATOR_SIZE = 50000;
7679
private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags.";
80+
private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage";
7781

7882
private final SparkConf sparkConf;
7983
private final String sparkVersion;
@@ -123,6 +127,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
123127
private long availableExecutorTime = 0;
124128

125129
private volatile boolean applicationEnded = false;
130+
private SparkListener openLineageSparkListener = null;
126131

127132
public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sparkVersion) {
128133
tracer = AgentTracer.get();
@@ -151,8 +156,50 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
151156
finishApplication(System.currentTimeMillis(), null, 0, null);
152157
}
153158
}));
159+
initApplicationSpanIfNotInitialized();
160+
loadOlSparkListener();
161+
}
162+
163+
static void setupSparkConf(SparkConf sparkConf) {
164+
sparkConf.set("spark.openlineage.transport.type", "composite");
165+
sparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
166+
sparkConf.set("spark.openlineage.transport.transports.agent.type", "http");
167+
sparkConf.set("spark.openlineage.transport.transports.agent.url", getAgentHttpUrl());
168+
sparkConf.set("spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT);
169+
sparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip");
170+
}
154171

155-
log.info("Created datadog spark listener: {}", this.getClass().getSimpleName());
172+
void setupTrace(SparkConf sc) {
173+
sc.set(
174+
"spark.openlineage.run.tags",
175+
"_dd.trace_id:"
176+
+ applicationSpan.context().getTraceId().toString()
177+
+ ";_dd.intake.emit_spans:false");
178+
}
179+
180+
void loadOlSparkListener() {
181+
String className = "io.openlineage.spark.agent.OpenLineageSparkListener";
182+
Optional<Class> clazz = loadClass(className);
183+
if (!clazz.isPresent()) {
184+
log.info("OpenLineage integration is not present on the classpath");
185+
return;
186+
}
187+
try {
188+
setupSparkConf(sparkConf);
189+
sparkConf.set(
190+
"spark.openlineage.run.tags",
191+
"_dd.trace_id:"
192+
+ applicationSpan.context().getTraceId().toString()
193+
+ ";_dd.ol_intake.emit_spans:false");
194+
195+
openLineageSparkListener =
196+
(SparkListener)
197+
clazz.get().getDeclaredConstructor(SparkConf.class).newInstance(sparkConf);
198+
log.info(
199+
"Created OL spark listener: {}", openLineageSparkListener.getClass().getSimpleName());
200+
} catch (Exception e) {
201+
log.warn("Failed to instantiate OL Spark Listener: {}", e.toString());
202+
}
156203
}
157204

158205
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -176,6 +223,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
176223
@Override
177224
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
178225
this.applicationStart = applicationStart;
226+
initApplicationSpanIfNotInitialized();
227+
notifyOl(this.openLineageSparkListener::onApplicationStart, applicationStart);
179228
}
180229

181230
private void initApplicationSpanIfNotInitialized() {
@@ -233,6 +282,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
233282
log.info(
234283
"Received spark application end event, finish trace on this event: {}",
235284
finishTraceOnApplicationEnd);
285+
notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd);
236286

237287
if (finishTraceOnApplicationEnd) {
238288
finishApplication(applicationEnd.time(), null, 0, null);
@@ -426,6 +476,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
426476
stageToJob.put(stageId, jobStart.jobId());
427477
}
428478
jobSpans.put(jobStart.jobId(), jobSpan);
479+
notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart);
429480
}
430481

431482
@Override
@@ -456,6 +507,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
456507
if (metrics != null) {
457508
metrics.setSpanMetrics(jobSpan);
458509
}
510+
notifyOl(x -> openLineageSparkListener.onJobEnd(x), jobEnd);
459511

460512
jobSpan.finish(jobEnd.time() * 1000);
461513
}
@@ -624,6 +676,8 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
624676

625677
Properties props = stageProperties.get(stageSpanKey);
626678
sendTaskSpan(stageSpan, taskEnd, props);
679+
680+
notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd);
627681
}
628682

629683
private void sendTaskSpan(
@@ -705,6 +759,15 @@ public void onOtherEvent(SparkListenerEvent event) {
705759
updateAdaptiveSQLPlan(event);
706760
}
707761

762+
private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
763+
if (this.openLineageSparkListener != null) {
764+
log.debug("Notifying with event `{}`", event.getClass().getCanonicalName());
765+
ol.accept(event);
766+
} else {
767+
log.debug("OpenLineageSparkListener is null");
768+
}
769+
}
770+
708771
private static final Class<?> adaptiveExecutionUpdateClass;
709772
private static final MethodHandle adaptiveExecutionIdMethod;
710773
private static final MethodHandle adaptiveSparkPlanMethod;
@@ -765,6 +828,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
765828
if (metrics != null) {
766829
metrics.setSpanMetrics(span);
767830
}
831+
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlEnd);
768832

769833
span.finish(sqlEnd.time() * 1000);
770834
}
@@ -1260,9 +1324,71 @@ private static String getDatabricksRunName(SparkConf conf) {
12601324
return null;
12611325
}
12621326

1327+
private static String getAgentHttpUrl() {
1328+
StringBuilder sb =
1329+
new StringBuilder("http://")
1330+
.append(Config.get().getAgentHost())
1331+
.append(":")
1332+
.append(Config.get().getAgentPort());
1333+
return sb.toString();
1334+
}
1335+
12631336
@SuppressForbidden // called at most once per spark application
12641337
private static String removeUuidFromEndOfString(String input) {
12651338
return input.replaceAll(
12661339
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
12671340
}
1341+
1342+
private Optional<Class> loadClass(String className) {
1343+
Class clazz = null;
1344+
List<ClassLoader> availableClassloaders =
1345+
Thread.getAllStackTraces().keySet().stream()
1346+
.map(Thread::getContextClassLoader)
1347+
.filter(Objects::nonNull)
1348+
.collect(Collectors.toList());
1349+
try {
1350+
clazz = Class.forName(className);
1351+
} catch (Exception e) {
1352+
log.debug("Failed to load {} via Class.forName: {}", className, e.toString());
1353+
for (ClassLoader classLoader : availableClassloaders) {
1354+
try {
1355+
clazz = classLoader.loadClass(className);
1356+
log.debug("Loaded {} via classLoader: {}", className, classLoader);
1357+
break;
1358+
} catch (Exception ex) {
1359+
log.debug(
1360+
"Failed to load {} via loadClass via ClassLoader {} - {}",
1361+
className,
1362+
classLoader,
1363+
ex.toString());
1364+
}
1365+
try {
1366+
clazz = classLoader.getParent().loadClass(className);
1367+
log.debug(
1368+
"Loaded {} via parent classLoader: {} for CL {}",
1369+
className,
1370+
classLoader.getParent(),
1371+
classLoader);
1372+
break;
1373+
} catch (Exception ex) {
1374+
log.debug(
1375+
"Failed to load {} via loadClass via parent ClassLoader {} - {}",
1376+
className,
1377+
classLoader.getParent(),
1378+
ex.toString());
1379+
}
1380+
}
1381+
}
1382+
if (clazz == null) {
1383+
try {
1384+
clazz = ClassLoader.getSystemClassLoader().loadClass(className);
1385+
log.debug(
1386+
"Loaded {} via system classLoader: {}", className, ClassLoader.getSystemClassLoader());
1387+
} catch (Exception ex) {
1388+
log.debug(
1389+
"Failed to load {} via loadClass via SystemClassLoader {}", className, ex.toString());
1390+
}
1391+
}
1392+
return Optional.ofNullable(clazz);
1393+
}
12681394
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import net.bytebuddy.asm.Advice;
1212
import org.apache.spark.deploy.SparkSubmitArguments;
13+
import org.apache.spark.scheduler.SparkListenerInterface;
1314

1415
public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing
1516
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
@@ -28,7 +29,8 @@ public String[] knownMatchingTypes() {
2829
return new String[] {
2930
"org.apache.spark.SparkContext",
3031
"org.apache.spark.deploy.SparkSubmit",
31-
"org.apache.spark.deploy.yarn.ApplicationMaster"
32+
"org.apache.spark.deploy.yarn.ApplicationMaster",
33+
"org.apache.spark.scheduler.LiveListenerBus",
3234
};
3335
}
3436

@@ -55,6 +57,14 @@ public void methodAdvice(MethodTransformer transformer) {
5557
.and(named("finish"))
5658
.and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))),
5759
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");
60+
61+
// LiveListenerBus class is used when running in a YARN cluster
62+
transformer.applyAdvice(
63+
isMethod()
64+
.and(named("addToSharedQueue"))
65+
.and(takesArgument(0, named("org.apache.spark.scheduler.SparkListenerInterface")))
66+
.and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))),
67+
AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice");
5868
}
5969

6070
public static class PrepareSubmitEnvAdvice {
@@ -100,4 +110,18 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
100110
}
101111
}
102112
}
113+
114+
public static class LiveListenerBusAdvice {
115+
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
116+
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
117+
// Skip instantiating OpenLineage listener - we will inject it later with custom config
118+
if (listener == null || listener.getClass().getCanonicalName() == null) {
119+
return false;
120+
}
121+
return listener
122+
.getClass()
123+
.getCanonicalName()
124+
.equals("io.openlineage.spark.agent.OpenLineageSparkListener");
125+
}
126+
}
103127
}

0 commit comments

Comments
 (0)