Skip to content

Commit 12a1f44

Browse files
author
Hendrik Muhs
authored
[Transform] improve irrecoverable error detection
treat resource not found and illegal argument exceptions as irrecoverable error relates #50135
1 parent 84dd9dc commit 12a1f44

File tree

3 files changed

+31
-25
lines changed

3 files changed

+31
-25
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
174174
// Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments
175175
// the indexing failure counter
176176
// and possibly retries)
177-
Exception irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
177+
Throwable irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
178178
deduplicatedFailures.values()
179179
);
180180
if (irrecoverableException == null) {
@@ -373,7 +373,7 @@ private static String getBulkIndexDetailedFailureMessage(String prefix, Map<Stri
373373
return failureMessage;
374374
}
375375

376-
private static Exception decorateBulkIndexException(Exception irrecoverableException) {
376+
private static Throwable decorateBulkIndexException(Throwable irrecoverableException) {
377377
if (irrecoverableException instanceof MapperParsingException) {
378378
return new TransformException(
379379
"Destination index mappings are incompatible with the transform configuration.",

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

+22-20
Original file line numberDiff line numberDiff line change
@@ -484,28 +484,30 @@ synchronized void handleFailure(Exception e) {
484484
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
485485
} else if (unwrappedException instanceof IndexNotFoundException
486486
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
487-
|| unwrappedException instanceof TransformConfigReloadingException) {
488-
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
489-
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
490-
failIndexer(
491-
"task encountered more than "
492-
+ context.getNumFailureRetries()
493-
+ " failures; latest failure: "
494-
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
495-
);
496-
} else {
497-
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
498-
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
499-
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
500-
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
501-
502-
auditor.warning(
503-
getJobId(),
504-
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
487+
|| unwrappedException instanceof TransformConfigReloadingException
488+
|| unwrappedException instanceof ResourceNotFoundException
489+
|| unwrappedException instanceof IllegalArgumentException) {
490+
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
491+
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
492+
failIndexer(
493+
"task encountered more than "
494+
+ context.getNumFailureRetries()
495+
+ " failures; latest failure: "
496+
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
505497
);
506-
lastAuditedExceptionMessage = message;
498+
} else {
499+
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
500+
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
501+
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
502+
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
503+
504+
auditor.warning(
505+
getJobId(),
506+
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
507+
);
508+
lastAuditedExceptionMessage = message;
509+
}
507510
}
508-
}
509511
}
510512

511513
/**

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.transform.utils;
88

99
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.bulk.BulkItemResponse;
1112
import org.elasticsearch.action.search.SearchPhaseExecutionException;
1213
import org.elasticsearch.action.search.ShardSearchFailure;
@@ -63,10 +64,13 @@ public static String getDetailedMessage(Throwable t) {
6364
* @param failures a collection of bulk item responses
6465
* @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
6566
*/
66-
public static Exception getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
67+
public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
6768
for (BulkItemResponse failure : failures) {
68-
if (failure.getFailure().getCause() instanceof MapperParsingException) {
69-
return failure.getFailure().getCause();
69+
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(failure.getFailure().getCause());
70+
if (unwrappedThrowable instanceof MapperParsingException
71+
|| unwrappedThrowable instanceof IllegalArgumentException
72+
|| unwrappedThrowable instanceof ResourceNotFoundException) {
73+
return unwrappedThrowable;
7074
}
7175
}
7276

0 commit comments

Comments
 (0)