Skip to content

Commit e8762d2

Browse files
Spark job cancellation no longer marks application as failed (#8701)
Stop marking spark application as failed when the last spark job was cancelled Jira Ticket: DJMS-53
1 parent b00c24e commit e8762d2

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,6 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
498498
return;
499499
}
500500

501-
lastJobFailed = false;
502501
if (jobEnd.jobResult() instanceof JobFailed) {
503502
JobFailed jobFailed = (JobFailed) jobEnd.jobResult();
504503
Exception exception = jobFailed.exception();
@@ -510,9 +509,15 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
510509
jobSpan.setErrorMessage(errorMessage);
511510
jobSpan.setTag(DDTags.ERROR_STACK, errorStackTrace);
512511
jobSpan.setTag(DDTags.ERROR_TYPE, "Spark Job Failed");
513-
lastJobFailed = true;
514-
lastJobFailedMessage = errorMessage;
515-
lastJobFailedStackTrace = errorStackTrace;
512+
513+
// Only propagate the error to the application if it is not a cancellation
514+
if (errorMessage != null && !errorMessage.toLowerCase().contains("cancelled")) {
515+
lastJobFailed = true;
516+
lastJobFailedMessage = errorMessage;
517+
lastJobFailedStackTrace = errorStackTrace;
518+
}
519+
} else {
520+
lastJobFailed = false;
516521
}
517522

518523
SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId());

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

+34
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner {
121121
return new SparkListenerJobEnd(jobId, time, JobSucceeded$.MODULE$)
122122
}
123123

124+
protected jobFailedEvent(Integer jobId, Long time, String errorMessage) {
125+
def exception = new RuntimeException(errorMessage)
126+
def jobFailed = new org.apache.spark.scheduler.JobFailed(exception)
127+
return new SparkListenerJobEnd(jobId, time, jobFailed)
128+
}
129+
124130
protected stageSubmittedEvent(Integer stageId, Long time) {
125131
def stageInfo = createStageInfo(stageId)
126132
stageInfo.submissionTime = Option.apply(time)
@@ -457,6 +463,34 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner {
457463
}
458464
}
459465

466+
def "test lastJobFailed is not set when job is cancelled"() {
467+
setup:
468+
def listener = getTestDatadogSparkListener()
469+
listener.onApplicationStart(applicationStartEvent(1000L))
470+
listener.onJobStart(jobStartEvent(1, 1900L, [1]))
471+
listener.onJobEnd(jobFailedEvent(1, 2200L, "Job was cancelled by user"))
472+
listener.onApplicationEnd(new SparkListenerApplicationEnd(2300L))
473+
474+
expect:
475+
assertTraces(1) {
476+
trace(2) {
477+
span {
478+
operationName "spark.application"
479+
resourceName "spark.application"
480+
spanType "spark"
481+
errored false
482+
parent()
483+
}
484+
span {
485+
operationName "spark.job"
486+
spanType "spark"
487+
errored true
488+
childOf(span(0))
489+
}
490+
}
491+
}
492+
}
493+
460494
protected validateRelativeError(double value, double expected, double relativeAccuracy) {
461495
double relativeError = Math.abs(value - expected) / expected
462496
assert relativeError < relativeAccuracy

0 commit comments

Comments
 (0)