Skip to content

Commit a6bad06

Browse files
[7.x][ML] Report cause when datafeed extraction encounters error (#66167) (#66277)
When a datafeed encounters errors extracting data, often the error is an instance of `SearchPhaseExecutionException`. In that case the top level error message is `Partial shards failure` which is not very informative. This commit refactors a transform util method from `ExceptionRootCauseFinder`, which unwraps exceptions with special handling for `SearchPhaseExecutionException`, and makes use of it from datafeed `ProblemTracker` in order to provide a more useful error message. Backport of #66167
1 parent f1a709c commit a6bad06

File tree

10 files changed

+144
-69
lines changed

10 files changed

+144
-69
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.ElasticsearchStatusException;
1010
import org.elasticsearch.ResourceAlreadyExistsException;
1111
import org.elasticsearch.ResourceNotFoundException;
12+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
1213
import org.elasticsearch.action.search.ShardSearchFailure;
1314
import org.elasticsearch.common.ParseField;
1415
import org.elasticsearch.rest.RestStatus;
@@ -112,7 +113,36 @@ public static <T> T requireNonNull(T obj, ParseField paramName) {
112113
return requireNonNull(obj, paramName.getPreferredName());
113114
}
114115

116+
/**
117+
* @see org.elasticsearch.ExceptionsHelper#unwrapCause(Throwable)
118+
*/
115119
public static Throwable unwrapCause(Throwable t) {
116-
return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
120+
return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
121+
}
122+
123+
/**
124+
* Unwrap the exception stack and return the most likely cause.
125+
* This method has special handling for {@link SearchPhaseExecutionException}
126+
* where it returns the cause of the first shard failure.
127+
*
128+
* @param t raw Throwable
129+
* @return unwrapped throwable if possible
130+
*/
131+
public static Throwable findSearchExceptionRootCause(Throwable t) {
132+
// circuit breaking exceptions are at the bottom
133+
Throwable unwrappedThrowable = unwrapCause(t);
134+
135+
if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
136+
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) unwrappedThrowable;
137+
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
138+
Throwable unwrappedShardFailure = unwrapCause(shardFailure.getCause());
139+
140+
if (unwrappedShardFailure instanceof ElasticsearchException) {
141+
return unwrappedShardFailure;
142+
}
143+
}
144+
}
145+
146+
return unwrappedThrowable;
117147
}
118148
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/utils/ExceptionsHelper.java

+7
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,11 @@ public static <T> T requireNonNull(T obj, String paramName) {
1919
}
2020
return obj;
2121
}
22+
23+
/**
24+
* @see org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper#findSearchExceptionRootCause(Throwable)
25+
*/
26+
public static Throwable findSearchExceptionRootCause(Throwable t) {
27+
return org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper.findSearchExceptionRootCause(t);
28+
}
2229
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.ml.utils;
8+
9+
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
11+
import org.elasticsearch.action.search.ShardSearchFailure;
12+
import org.elasticsearch.indices.IndexCreationException;
13+
import org.elasticsearch.test.ESTestCase;
14+
15+
import static org.hamcrest.Matchers.equalTo;
16+
import static org.hamcrest.Matchers.sameInstance;
17+
18+
public class ExceptionsHelperTests extends ESTestCase {
19+
20+
public void testFindSearchExceptionRootCause_GivenWrappedSearchPhaseException() {
21+
SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase",
22+
"partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) });
23+
24+
Throwable rootCauseException = ExceptionsHelper.findSearchExceptionRootCause(
25+
new IndexCreationException("test-index", searchPhaseExecutionException));
26+
27+
assertThat(rootCauseException.getMessage(), equalTo("for the cause!"));
28+
}
29+
30+
public void testFindSearchExceptionRootCause_GivenRuntimeException() {
31+
RuntimeException runtimeException = new RuntimeException("nothing to unwrap here");
32+
assertThat(ExceptionsHelper.findSearchExceptionRootCause(runtimeException), sameInstance(runtimeException));
33+
}
34+
35+
public void testFindSearchExceptionRootCause_GivenWrapperException() {
36+
RuntimeException runtimeException = new RuntimeException("cause");
37+
38+
Throwable rootCauseException = ExceptionsHelper.findSearchExceptionRootCause(
39+
new IndexCreationException("test-index", runtimeException));
40+
41+
assertThat(rootCauseException.getMessage(), equalTo("cause"));
42+
}
43+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
11+
import org.elasticsearch.ElasticsearchException;
1012
import org.elasticsearch.ElasticsearchStatusException;
13+
import org.elasticsearch.ElasticsearchWrapperException;
1114
import org.elasticsearch.client.Client;
1215
import org.elasticsearch.common.collect.Tuple;
1316
import org.elasticsearch.common.io.Streams;
@@ -318,7 +321,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro
318321
try {
319322
extractedData = dataExtractor.next();
320323
} catch (Exception e) {
321-
LOGGER.debug("[" + jobId + "] error while extracting data", e);
324+
LOGGER.error(new ParameterizedMessage("[{}] error while extracting data", jobId), e);
322325
// When extraction problems are encountered, we do not want to advance time.
323326
// Instead, it is preferable to retry the given interval next time an extraction
324327
// is triggered.
@@ -350,7 +353,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro
350353
if (isIsolated) {
351354
return;
352355
}
353-
LOGGER.debug("[" + jobId + "] error while posting data", e);
356+
LOGGER.error(new ParameterizedMessage("[{}] error while posting data", jobId), e);
354357

355358
// a conflict exception means the job state is not open any more.
356359
// we should therefore stop the datafeed.
@@ -469,7 +472,7 @@ Long lastEndTimeMs() {
469472
return lastEndTimeMs;
470473
}
471474

472-
static class AnalysisProblemException extends RuntimeException {
475+
static class AnalysisProblemException extends ElasticsearchException implements ElasticsearchWrapperException {
473476

474477
final boolean shouldStop;
475478
final long nextDelayInMsSinceEpoch;
@@ -481,7 +484,7 @@ static class AnalysisProblemException extends RuntimeException {
481484
}
482485
}
483486

484-
static class ExtractionProblemException extends RuntimeException {
487+
static class ExtractionProblemException extends ElasticsearchException implements ElasticsearchWrapperException {
485488

486489
final long nextDelayInMsSinceEpoch;
487490

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,12 @@ protected void doRun() {
184184
if (endTime == null) {
185185
next = e.nextDelayInMsSinceEpoch;
186186
}
187-
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
187+
holder.problemTracker.reportExtractionProblem(e);
188188
} catch (DatafeedJob.AnalysisProblemException e) {
189189
if (endTime == null) {
190190
next = e.nextDelayInMsSinceEpoch;
191191
}
192-
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
192+
holder.problemTracker.reportAnalysisProblem(e);
193193
if (e.shouldStop) {
194194
holder.stop("lookback_analysis_error", TimeValue.timeValueSeconds(20), e);
195195
return;
@@ -241,10 +241,10 @@ protected void doRun() {
241241
holder.problemTracker.reportNonEmptyDataCount();
242242
} catch (DatafeedJob.ExtractionProblemException e) {
243243
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
244-
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
244+
holder.problemTracker.reportExtractionProblem(e);
245245
} catch (DatafeedJob.AnalysisProblemException e) {
246246
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
247-
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
247+
holder.problemTracker.reportAnalysisProblem(e);
248248
if (e.shouldStop) {
249249
holder.stop("realtime_analysis_error", TimeValue.timeValueSeconds(20), e);
250250
return;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.datafeed;
77

88
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
9+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
910
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
1011

1112
import java.util.Objects;
@@ -42,19 +43,19 @@ class ProblemTracker {
4243
/**
4344
* Reports as analysis problem if it is different than the last seen problem
4445
*
45-
* @param problemMessage the problem message
46+
* @param error the exception
4647
*/
47-
public void reportAnalysisProblem(String problemMessage) {
48-
reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR, problemMessage);
48+
public void reportAnalysisProblem(DatafeedJob.AnalysisProblemException error) {
49+
reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR, ExceptionsHelper.unwrapCause(error).getMessage());
4950
}
5051

5152
/**
5253
* Reports as extraction problem if it is different than the last seen problem
5354
*
54-
* @param problemMessage the problem message
55+
* @param error the exception
5556
*/
56-
public void reportExtractionProblem(String problemMessage) {
57-
reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR, problemMessage);
57+
public void reportExtractionProblem(DatafeedJob.ExtractionProblemException error) {
58+
reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR, ExceptionsHelper.findSearchExceptionRootCause(error).getMessage());
5859
}
5960

6061
/**

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java

+44-11
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
*/
66
package org.elasticsearch.xpack.ml.datafeed;
77

8+
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.ElasticsearchWrapperException;
10+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
11+
import org.elasticsearch.action.search.ShardSearchFailure;
812
import org.elasticsearch.test.ESTestCase;
913
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
1014
import org.junit.Before;
@@ -27,33 +31,43 @@ public void setUpTests() {
2731
}
2832

2933
public void testReportExtractionProblem() {
30-
problemTracker.reportExtractionProblem("foo");
34+
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
3135

32-
verify(auditor).error("foo", "Datafeed is encountering errors extracting data: foo");
36+
verify(auditor).error("foo", "Datafeed is encountering errors extracting data: cause");
37+
assertTrue(problemTracker.hasProblems());
38+
}
39+
40+
public void testReportExtractionProblem_GivenSearchPhaseExecutionException() {
41+
SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase",
42+
"partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) });
43+
44+
problemTracker.reportExtractionProblem(new DatafeedJob.ExtractionProblemException(0L, searchPhaseExecutionException));
45+
46+
verify(auditor).error("foo", "Datafeed is encountering errors extracting data: for the cause!");
3347
assertTrue(problemTracker.hasProblems());
3448
}
3549

3650
public void testReportAnalysisProblem() {
37-
problemTracker.reportAnalysisProblem("foo");
51+
problemTracker.reportAnalysisProblem(createAnalysisProblem("top level", "cause"));
3852

39-
verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: foo");
53+
verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: cause");
4054
assertTrue(problemTracker.hasProblems());
4155
}
4256

4357
public void testReportProblem_GivenSameProblemTwice() {
44-
problemTracker.reportExtractionProblem("foo");
45-
problemTracker.reportAnalysisProblem("foo");
58+
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
59+
problemTracker.reportAnalysisProblem(createAnalysisProblem("top level", "cause"));
4660

47-
verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo");
61+
verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: cause");
4862
assertTrue(problemTracker.hasProblems());
4963
}
5064

5165
public void testReportProblem_GivenSameProblemAfterFinishReport() {
52-
problemTracker.reportExtractionProblem("foo");
66+
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
5367
problemTracker.finishReport();
54-
problemTracker.reportExtractionProblem("foo");
68+
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
5569

56-
verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo");
70+
verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: cause");
5771
assertTrue(problemTracker.hasProblems());
5872
}
5973

@@ -108,12 +122,31 @@ public void testFinishReport_GivenNoProblems() {
108122
}
109123

110124
public void testFinishReport_GivenRecovery() {
111-
problemTracker.reportExtractionProblem("bar");
125+
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "bar"));
112126
problemTracker.finishReport();
113127
problemTracker.finishReport();
114128

115129
verify(auditor).error("foo", "Datafeed is encountering errors extracting data: bar");
116130
verify(auditor).info("foo", "Datafeed has recovered data extraction and analysis");
117131
assertFalse(problemTracker.hasProblems());
118132
}
133+
134+
private static DatafeedJob.ExtractionProblemException createExtractionProblem(String error, String cause) {
135+
Exception causeException = new RuntimeException(cause);
136+
Exception wrappedException = new TestWrappedException(error, causeException);
137+
return new DatafeedJob.ExtractionProblemException(0L, wrappedException);
138+
}
139+
140+
private static DatafeedJob.AnalysisProblemException createAnalysisProblem(String error, String cause) {
141+
Exception causeException = new RuntimeException(cause);
142+
Exception wrappedException = new TestWrappedException(error, causeException);
143+
return new DatafeedJob.AnalysisProblemException(0L, false, wrappedException);
144+
}
145+
146+
private static class TestWrappedException extends RuntimeException implements ElasticsearchWrapperException {
147+
148+
TestWrappedException(String message, Throwable cause) {
149+
super(message, cause);
150+
}
151+
}
119152
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ synchronized void stopAndSaveState() {
631631

632632
synchronized void handleFailure(Exception e) {
633633
logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e);
634-
Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e);
634+
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
635635

636636
if (unwrappedException instanceof CircuitBreakingException) {
637637
handleCircuitBreakingException((CircuitBreakingException) unwrappedException);

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java

-26
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
import org.elasticsearch.ElasticsearchException;
1010
import org.elasticsearch.action.bulk.BulkItemResponse;
11-
import org.elasticsearch.action.search.SearchPhaseExecutionException;
12-
import org.elasticsearch.action.search.ShardSearchFailure;
1311
import org.elasticsearch.rest.RestStatus;
1412

1513
import java.util.Arrays;
@@ -38,30 +36,6 @@ public final class ExceptionRootCauseFinder {
3836
)
3937
);
4038

41-
/**
42-
* Unwrap the exception stack and return the most likely cause.
43-
*
44-
* @param t raw Throwable
45-
* @return unwrapped throwable if possible
46-
*/
47-
public static Throwable getRootCauseException(Throwable t) {
48-
// circuit breaking exceptions are at the bottom
49-
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(t);
50-
51-
if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
52-
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) unwrappedThrowable;
53-
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
54-
Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());
55-
56-
if (unwrappedShardFailure instanceof ElasticsearchException) {
57-
return unwrappedShardFailure;
58-
}
59-
}
60-
}
61-
62-
return t;
63-
}
64-
6539
/**
6640
* Return the best error message possible given a already unwrapped exception.
6741
*

0 commit comments

Comments
 (0)