Skip to content

Commit d36e910

Browse files
committed
set OL if detected
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent ca6895a commit d36e910

File tree

2 files changed

+157
-3
lines changed

2 files changed

+157
-3
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@
3333
import java.util.LinkedHashMap;
3434
import java.util.List;
3535
import java.util.Map;
36+
import java.util.Objects;
3637
import java.util.Optional;
3738
import java.util.Properties;
3839
import java.util.UUID;
40+
import java.util.function.Consumer;
41+
import java.util.stream.Collectors;
3942
import org.apache.spark.ExceptionFailure;
4043
import org.apache.spark.SparkConf;
4144
import org.apache.spark.TaskFailedReason;
@@ -74,6 +77,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
7477
private final int MAX_COLLECTION_SIZE = 5000;
7578
private final int MAX_ACCUMULATOR_SIZE = 50000;
7679
private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags.";
80+
private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage";
7781

7882
private final SparkConf sparkConf;
7983
private final String sparkVersion;
@@ -123,6 +127,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
123127
private long availableExecutorTime = 0;
124128

125129
private volatile boolean applicationEnded = false;
130+
private SparkListener openLineageSparkListener = null;
126131

127132
public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sparkVersion) {
128133
tracer = AgentTracer.get();
@@ -151,8 +156,50 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
151156
finishApplication(System.currentTimeMillis(), null, 0, null);
152157
}
153158
}));
159+
initApplicationSpanIfNotInitialized();
160+
loadOlSparkListener();
161+
}
162+
163+
static void setupSparkConf(SparkConf sparkConf) {
164+
sparkConf.set("spark.openlineage.transport.type", "composite");
165+
sparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
166+
sparkConf.set("spark.openlineage.transport.transports.agent.type", "http");
167+
sparkConf.set("spark.openlineage.transport.transports.agent.url", getAgentHttpUrl());
168+
sparkConf.set("spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT);
169+
sparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip");
170+
}
154171

155-
log.info("Created datadog spark listener: {}", this.getClass().getSimpleName());
172+
void setupTrace(SparkConf sc) {
173+
sc.set(
174+
"spark.openlineage.run.tags",
175+
"_dd.trace_id:"
176+
+ applicationSpan.context().getTraceId().toString()
177+
+ ";_dd.intake.emit_spans:false");
178+
}
179+
180+
void loadOlSparkListener() {
181+
String className = "io.openlineage.spark.agent.OpenLineageSparkListener";
182+
Optional<Class> clazz = loadClass(className);
183+
if (!clazz.isPresent()) {
184+
log.info("OpenLineage integration is not present on the classpath");
185+
return;
186+
}
187+
try {
188+
setupSparkConf(sparkConf);
189+
sparkConf.set(
190+
"spark.openlineage.run.tags",
191+
"_dd.trace_id:"
192+
+ applicationSpan.context().getTraceId().toString()
193+
+ ";_dd.ol_intake.emit_spans:false");
194+
195+
openLineageSparkListener =
196+
(SparkListener)
197+
clazz.get().getDeclaredConstructor(SparkConf.class).newInstance(sparkConf);
198+
log.info(
199+
"Created OL spark listener: {}", openLineageSparkListener.getClass().getSimpleName());
200+
} catch (Exception e) {
201+
log.warn("Failed to instantiate OL Spark Listener: {}", e.toString());
202+
}
156203
}
157204

158205
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -176,6 +223,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
176223
@Override
177224
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
178225
this.applicationStart = applicationStart;
226+
initApplicationSpanIfNotInitialized();
227+
notifyOl(this.openLineageSparkListener::onApplicationStart, applicationStart);
179228
}
180229

181230
private void initApplicationSpanIfNotInitialized() {
@@ -233,6 +282,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
233282
log.info(
234283
"Received spark application end event, finish trace on this event: {}",
235284
finishTraceOnApplicationEnd);
285+
notifyOl(this.openLineageSparkListener::onApplicationEnd, applicationEnd);
236286

237287
if (finishTraceOnApplicationEnd) {
238288
finishApplication(applicationEnd.time(), null, 0, null);
@@ -426,6 +476,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
426476
stageToJob.put(stageId, jobStart.jobId());
427477
}
428478
jobSpans.put(jobStart.jobId(), jobSpan);
479+
notifyOl(this.openLineageSparkListener::onJobStart, jobStart);
429480
}
430481

431482
@Override
@@ -456,6 +507,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
456507
if (metrics != null) {
457508
metrics.setSpanMetrics(jobSpan);
458509
}
510+
notifyOl(this.openLineageSparkListener::onJobEnd, jobEnd);
459511

460512
jobSpan.finish(jobEnd.time() * 1000);
461513
}
@@ -624,6 +676,8 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
624676

625677
Properties props = stageProperties.get(stageSpanKey);
626678
sendTaskSpan(stageSpan, taskEnd, props);
679+
680+
notifyOl(this.openLineageSparkListener::onTaskEnd, taskEnd);
627681
}
628682

629683
private void sendTaskSpan(
@@ -705,6 +759,15 @@ public void onOtherEvent(SparkListenerEvent event) {
705759
updateAdaptiveSQLPlan(event);
706760
}
707761

762+
private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
763+
if (this.openLineageSparkListener != null) {
764+
log.debug("Notifying with event `{}`", event.getClass().getCanonicalName());
765+
ol.accept(event);
766+
} else {
767+
log.debug("OpenLineageSparkListener is null");
768+
}
769+
}
770+
708771
private static final Class<?> adaptiveExecutionUpdateClass;
709772
private static final MethodHandle adaptiveExecutionIdMethod;
710773
private static final MethodHandle adaptiveSparkPlanMethod;
@@ -765,6 +828,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
765828
if (metrics != null) {
766829
metrics.setSpanMetrics(span);
767830
}
831+
notifyOl(this.openLineageSparkListener::onOtherEvent, sqlEnd);
768832

769833
span.finish(sqlEnd.time() * 1000);
770834
}
@@ -923,7 +987,7 @@ private void setDataJobsSamplingPriority(AgentSpan span) {
923987

924988
private AgentTracer.SpanBuilder buildSparkSpan(String spanName, Properties properties) {
925989
AgentTracer.SpanBuilder builder =
926-
tracer.buildSpan(spanName).withSpanType("spark").withTag("app_id", appId);
990+
tracer.buildSpan("spark", spanName).withSpanType("spark").withTag("app_id", appId);
927991

928992
if (databricksServiceName != null) {
929993
builder.withServiceName(databricksServiceName);
@@ -1260,9 +1324,71 @@ private static String getDatabricksRunName(SparkConf conf) {
12601324
return null;
12611325
}
12621326

1327+
private static String getAgentHttpUrl() {
1328+
StringBuilder sb =
1329+
new StringBuilder("http://")
1330+
.append(Config.get().getAgentHost())
1331+
.append(":")
1332+
.append(Config.get().getAgentPort());
1333+
return sb.toString();
1334+
}
1335+
12631336
@SuppressForbidden // called at most once per spark application
12641337
private static String removeUuidFromEndOfString(String input) {
12651338
return input.replaceAll(
12661339
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
12671340
}
1341+
1342+
private Optional<Class> loadClass(String className) {
1343+
Class clazz = null;
1344+
List<ClassLoader> availableClassloaders =
1345+
Thread.getAllStackTraces().keySet().stream()
1346+
.map(Thread::getContextClassLoader)
1347+
.filter(Objects::nonNull)
1348+
.collect(Collectors.toList());
1349+
try {
1350+
clazz = Class.forName(className);
1351+
} catch (Exception e) {
1352+
log.debug("Failed to load {} via Class.forName: {}", className, e.toString());
1353+
for (ClassLoader classLoader : availableClassloaders) {
1354+
try {
1355+
clazz = classLoader.loadClass(className);
1356+
log.debug("Loaded {} via classLoader: {}", className, classLoader);
1357+
break;
1358+
} catch (Exception ex) {
1359+
log.debug(
1360+
"Failed to load {} via loadClass via ClassLoader {} - {}",
1361+
className,
1362+
classLoader,
1363+
ex.toString());
1364+
}
1365+
try {
1366+
clazz = classLoader.getParent().loadClass(className);
1367+
log.debug(
1368+
"Loaded {} via parent classLoader: {} for CL {}",
1369+
className,
1370+
classLoader.getParent(),
1371+
classLoader);
1372+
break;
1373+
} catch (Exception ex) {
1374+
log.debug(
1375+
"Failed to load {} via loadClass via parent ClassLoader {} - {}",
1376+
className,
1377+
classLoader.getParent(),
1378+
ex.toString());
1379+
}
1380+
}
1381+
}
1382+
if (clazz == null) {
1383+
try {
1384+
clazz = ClassLoader.getSystemClassLoader().loadClass(className);
1385+
log.debug(
1386+
"Loaded {} via system classLoader: {}", className, ClassLoader.getSystemClassLoader());
1387+
} catch (Exception ex) {
1388+
log.debug(
1389+
"Failed to load {} via loadClass via SystemClassLoader {}", className, ex.toString());
1390+
}
1391+
}
1392+
return Optional.ofNullable(clazz);
1393+
}
12681394
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,15 @@
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import net.bytebuddy.asm.Advice;
1212
import org.apache.spark.deploy.SparkSubmitArguments;
13+
import org.apache.spark.scheduler.SparkListenerInterface;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
1316

1417
public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing
1518
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
1619

20+
private static final Logger log = LoggerFactory.getLogger(AbstractSparkInstrumentation.class);
21+
1722
public AbstractSparkInstrumentation() {
1823
super("spark", "apache-spark");
1924
}
@@ -28,7 +33,8 @@ public String[] knownMatchingTypes() {
2833
return new String[] {
2934
"org.apache.spark.SparkContext",
3035
"org.apache.spark.deploy.SparkSubmit",
31-
"org.apache.spark.deploy.yarn.ApplicationMaster"
36+
"org.apache.spark.deploy.yarn.ApplicationMaster",
37+
"org.apache.spark.scheduler.LiveListenerBus",
3238
};
3339
}
3440

@@ -55,6 +61,14 @@ public void methodAdvice(MethodTransformer transformer) {
5561
.and(named("finish"))
5662
.and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))),
5763
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");
64+
65+
// LiveListenerBus class is used when running in a YARN cluster
66+
transformer.applyAdvice(
67+
isMethod()
68+
.and(named("addToSharedQueue"))
69+
.and(takesArgument(0, named("org.apache.spark.scheduler.SparkListenerInterface")))
70+
.and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))),
71+
AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice");
5872
}
5973

6074
public static class PrepareSubmitEnvAdvice {
@@ -100,4 +114,18 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
100114
}
101115
}
102116
}
117+
118+
public static class LiveListenerBusAdvice {
119+
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
120+
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
121+
// Skip instantiating OpenLineage listener - we will inject it later with custom config
122+
if (listener == null || listener.getClass().getCanonicalName() == null) {
123+
return false;
124+
}
125+
return listener
126+
.getClass()
127+
.getCanonicalName()
128+
.equals("io.openlineage.spark.agent.OpenLineageSparkListener");
129+
}
130+
}
103131
}

0 commit comments

Comments
 (0)