Skip to content

Commit ca91262

Browse files
author
Hendrik Muhs
committed
[Transform] improve error handling of script errors (#48887)
improve error handling for script errors, treating it as irrecoverable errors which puts the task immediately into failed state, also improves the error extraction to properly report the script error. fixes #48467
1 parent 23a26ce commit ca91262

File tree

5 files changed

+286
-130
lines changed

5 files changed

+286
-130
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

+2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public class TransformMessages {
7777
public static final String LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE =
7878
"Insufficient memory for search after repeated page size reductions to [{0}], unable to continue pivot, "
7979
+ "please simplify job or increase heap size on data nodes.";
80+
public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR =
81+
"Failed to execute script with error: [{0}], stack trace: {1}";
8082

8183
public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
8284
"Failed to parse transform checkpoints for [{0}]";

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java

+36-21
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,28 @@
2828
import static org.hamcrest.CoreMatchers.is;
2929
import static org.hamcrest.CoreMatchers.nullValue;
3030
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.matchesRegex;
3132

3233
public class TransformTaskFailedStateIT extends TransformRestTestCase {
3334

3435
private final List<String> failureTransforms = new ArrayList<>();
36+
3537
@Before
3638
public void setClusterSettings() throws IOException {
3739
// Make sure we never retry on failure to speed up the test
3840
// Set logging level to trace
3941
// see: https://github.com/elastic/elasticsearch/issues/45562
4042
Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
41-
addFailureRetrySetting.setJsonEntity(
42-
"{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" + 0 + "\"," +
43-
"\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam
44-
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
45-
"\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}");
43+
addFailureRetrySetting
44+
.setJsonEntity(
45+
"{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \""
46+
+ 0
47+
+ "\","
48+
+ "\"logger.org.elasticsearch.action.bulk\": \"info\","
49+
+ // reduces bulk failure spam
50+
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\","
51+
+ "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"
52+
);
4653
client().performRequest(addFailureRetrySetting);
4754
}
4855

@@ -66,18 +73,22 @@ public void testForceStopFailedTransform() throws Exception {
6673
startDataframeTransform(transformId);
6774
awaitState(transformId, TransformStats.State.FAILED);
6875
Map<?, ?> fullState = getDataFrameState(transformId);
69-
final String failureReason = "task encountered more than 0 failures; latest failure: " +
70-
"Bulk index experienced failures. See the logs of the node running the transform for details.";
76+
final String failureReason = "task encountered more than 0 failures; latest failure: "
77+
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
7178
// Verify we have failed for the expected reason
72-
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));
79+
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));
7380

7481
// verify that we cannot stop a failed transform
7582
ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false));
7683
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
77-
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
78-
equalTo("Unable to stop transform [test-force-stop-failed-transform] as it is in a failed state with reason [" +
79-
failureReason +
80-
"]. Use force stop to stop the transform."));
84+
assertThat(
85+
(String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
86+
matchesRegex(
87+
"Unable to stop transform \\[test-force-stop-failed-transform\\] as it is in a failed state with reason \\["
88+
+ failureReason
89+
+ "\\]. Use force stop to stop the transform."
90+
)
91+
);
8192

8293
// Verify that we can force stop a failed transform
8394
stopTransform(transformId, true);
@@ -97,20 +108,23 @@ public void testStartFailedTransform() throws Exception {
97108
startDataframeTransform(transformId);
98109
awaitState(transformId, TransformStats.State.FAILED);
99110
Map<?, ?> fullState = getDataFrameState(transformId);
100-
final String failureReason = "task encountered more than 0 failures; latest failure: " +
101-
"Bulk index experienced failures. See the logs of the node running the transform for details.";
111+
final String failureReason = "task encountered more than 0 failures; latest failure: "
112+
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
102113
// Verify we have failed for the expected reason
103-
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));
114+
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));
104115

105-
final String expectedFailure = "Unable to start transform [test-force-start-failed-transform] " +
106-
"as it is in a failed state with failure: [" + failureReason +
107-
"]. Use force stop and then restart the transform once error is resolved.";
116+
final String expectedFailure = "Unable to start transform \\[test-force-start-failed-transform\\] "
117+
+ "as it is in a failed state with failure: \\["
118+
+ failureReason
119+
+ "\\]. Use force stop and then restart the transform once error is resolved.";
108120
// Verify that we cannot start the transform when the task is in a failed state
109121
assertBusy(() -> {
110122
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId));
111123
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
112-
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
113-
equalTo(expectedFailure));
124+
assertThat(
125+
(String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
126+
matchesRegex(expectedFailure)
127+
);
114128
}, 60, TimeUnit.SECONDS);
115129

116130
stopTransform(transformId, true);
@@ -128,7 +142,8 @@ private void createDestinationIndexWithBadMapping(String indexName) throws IOExc
128142
try (XContentBuilder builder = jsonBuilder()) {
129143
builder.startObject();
130144
{
131-
builder.startObject("mappings")
145+
builder
146+
.startObject("mappings")
132147
.startObject("properties")
133148
.startObject("reviewer")
134149
.field("type", "long")

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

+49-63
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,16 @@
1313
import org.elasticsearch.ResourceNotFoundException;
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.index.IndexRequest;
16-
import org.elasticsearch.action.search.SearchPhaseExecutionException;
1716
import org.elasticsearch.action.search.SearchRequest;
1817
import org.elasticsearch.action.search.SearchResponse;
19-
import org.elasticsearch.action.search.ShardSearchFailure;
2018
import org.elasticsearch.action.support.IndicesOptions;
2119
import org.elasticsearch.common.Strings;
2220
import org.elasticsearch.common.breaker.CircuitBreakingException;
2321
import org.elasticsearch.common.xcontent.XContentBuilder;
2422
import org.elasticsearch.index.IndexNotFoundException;
2523
import org.elasticsearch.index.query.BoolQueryBuilder;
2624
import org.elasticsearch.index.query.QueryBuilder;
25+
import org.elasticsearch.script.ScriptException;
2726
import org.elasticsearch.search.aggregations.Aggregations;
2827
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
2928
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
@@ -45,6 +44,7 @@
4544
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
4645
import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;
4746
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
47+
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
4848

4949
import java.io.IOException;
5050
import java.io.UncheckedIOException;
@@ -66,7 +66,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
6666
* which query filters to run and which index requests to send
6767
*/
6868
private enum RunState {
69-
// do a complete query/index, this is used for batch data frames and for bootstraping (1st run)
69+
// do a complete query/index, this is used for batch transforms and for bootstrapping (1st run)
7070
FULL_RUN,
7171

7272
// Partial run modes in 2 stages:
@@ -422,7 +422,7 @@ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse sea
422422
default:
423423
// Any other state is a bug, should not happen
424424
logger.warn("[{}] Encountered unexpected run state [{}]", getJobId(), runState);
425-
throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
425+
throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]");
426426
}
427427
}
428428

@@ -468,25 +468,36 @@ protected void onAbort() {
468468

469469
synchronized void handleFailure(Exception e) {
470470
logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e);
471-
if (handleCircuitBreakingException(e)) {
472-
return;
473-
}
474-
475-
if (isIrrecoverableFailure(e) || context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
476-
String failureMessage = isIrrecoverableFailure(e)
477-
? "task encountered irrecoverable failure: " + e.getMessage()
478-
: "task encountered more than " + context.getNumFailureRetries() + " failures; latest failure: " + e.getMessage();
479-
failIndexer(failureMessage);
471+
Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e);
472+
473+
if (unwrappedException instanceof CircuitBreakingException) {
474+
handleCircuitBreakingException((CircuitBreakingException) unwrappedException);
475+
} else if (unwrappedException instanceof ScriptException) {
476+
handleScriptException((ScriptException) unwrappedException);
477+
// irrecoverable error without special handling
478+
} else if (unwrappedException instanceof IndexNotFoundException
479+
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
480+
|| unwrappedException instanceof TransformConfigReloadingException) {
481+
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
482+
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
483+
failIndexer(
484+
"task encountered more than "
485+
+ context.getNumFailureRetries()
486+
+ " failures; latest failure: "
487+
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
488+
);
480489
} else {
481490
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
482491
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
483492
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
493+
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
494+
484495
auditor
485496
.warning(
486497
getJobId(),
487-
"Transform encountered an exception: " + e.getMessage() + " Will attempt again at next scheduled trigger."
498+
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
488499
);
489-
lastAuditedExceptionMessage = e.getMessage();
500+
lastAuditedExceptionMessage = message;
490501
}
491502
}
492503
}
@@ -510,12 +521,6 @@ private void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
510521
}));
511522
}
512523

513-
private boolean isIrrecoverableFailure(Exception e) {
514-
return e instanceof IndexNotFoundException
515-
|| e instanceof AggregationResultUtils.AggregationExtractionException
516-
|| e instanceof TransformConfigReloadingException;
517-
}
518-
519524
private IterationResult<TransformIndexerPosition> processBuckets(final CompositeAggregation agg) {
520525
// we reached the end
521526
if (agg.getBuckets().isEmpty()) {
@@ -536,7 +541,7 @@ private IterationResult<TransformIndexerPosition> processBuckets(final Composite
536541
agg.getBuckets().isEmpty()
537542
);
538543

539-
// NOTE: progress is also mutated in ClientDataFrameIndexer#onFinished
544+
// NOTE: progress is also mutated in onFinish
540545
if (progress != null) {
541546
progress.incrementDocsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
542547
progress.incrementDocsIndexed(result.getToIndex().size());
@@ -671,7 +676,7 @@ protected SearchRequest buildSearchRequest() {
671676
default:
672677
// Any other state is a bug, should not happen
673678
logger.warn("Encountered unexpected run state [" + runState + "]");
674-
throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
679+
throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]");
675680
}
676681

677682
searchRequest.source(sourceBuilder);
@@ -756,16 +761,9 @@ private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBu
756761
* Implementation details: We take the values from the circuit breaker as a hint, but
757762
* note that it breaks early, that's why we also reduce using
758763
*
759-
* @param e Exception thrown, only {@link CircuitBreakingException} are handled
760-
* @return true if exception was handled, false if not
764+
* @param circuitBreakingException CircuitBreakingException thrown
761765
*/
762-
protected boolean handleCircuitBreakingException(Exception e) {
763-
CircuitBreakingException circuitBreakingException = getCircuitBreakingException(e);
764-
765-
if (circuitBreakingException == null) {
766-
return false;
767-
}
768-
766+
private void handleCircuitBreakingException(CircuitBreakingException circuitBreakingException) {
769767
double reducingFactor = Math
770768
.min(
771769
(double) circuitBreakingException.getByteLimit() / circuitBreakingException.getBytesWanted(),
@@ -777,15 +775,29 @@ protected boolean handleCircuitBreakingException(Exception e) {
777775
if (newPageSize < MINIMUM_PAGE_SIZE) {
778776
String message = TransformMessages.getMessage(TransformMessages.LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE, pageSize);
779777
failIndexer(message);
780-
return true;
778+
return;
781779
}
782780

783781
String message = TransformMessages.getMessage(TransformMessages.LOG_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE, pageSize, newPageSize);
784782
auditor.info(getJobId(), message);
785-
logger.info("Data frame transform [" + getJobId() + "]:" + message);
786-
783+
logger.info("[{}] {}", getJobId(), message);
787784
pageSize = newPageSize;
788-
return true;
785+
return;
786+
}
787+
788+
/**
789+
* Handle script exception case. This is error is irrecoverable.
790+
*
791+
* @param scriptException ScriptException thrown
792+
*/
793+
private void handleScriptException(ScriptException scriptException) {
794+
String message = TransformMessages
795+
.getMessage(
796+
TransformMessages.LOG_TRANSFORM_PIVOT_SCRIPT_ERROR,
797+
scriptException.getDetailedMessage(),
798+
scriptException.getScriptStack()
799+
);
800+
failIndexer(message);
789801
}
790802

791803
protected void failIndexer(String failureMessage) {
@@ -818,7 +830,7 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) {
818830
}
819831

820832
private RunState determineRunStateAtStart() {
821-
// either 1st run or not a continuous data frame
833+
// either 1st run or not a continuous transform
822834
if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
823835
return RunState.FULL_RUN;
824836
}
@@ -832,32 +844,6 @@ private RunState determineRunStateAtStart() {
832844
return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
833845
}
834846

835-
/**
836-
* Inspect exception for circuit breaking exception and return the first one it can find.
837-
*
838-
* @param e Exception
839-
* @return CircuitBreakingException instance if found, null otherwise
840-
*/
841-
private static CircuitBreakingException getCircuitBreakingException(Exception e) {
842-
// circuit breaking exceptions are at the bottom
843-
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(e);
844-
845-
if (unwrappedThrowable instanceof CircuitBreakingException) {
846-
return (CircuitBreakingException) unwrappedThrowable;
847-
} else if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
848-
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) e;
849-
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
850-
Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());
851-
852-
if (unwrappedShardFailure instanceof CircuitBreakingException) {
853-
return (CircuitBreakingException) unwrappedShardFailure;
854-
}
855-
}
856-
}
857-
858-
return null;
859-
}
860-
861847
static class TransformConfigReloadingException extends ElasticsearchException {
862848
TransformConfigReloadingException(String msg, Throwable cause, Object... args) {
863849
super(msg, cause, args);

0 commit comments

Comments
 (0)