Skip to content

[Transform] improve irrecoverable error detection #51820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
// Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments
// the indexing failure counter
// and possibly retries)
Exception irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
Throwable irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
deduplicatedFailures.values()
);
if (irrecoverableException == null) {
Expand Down Expand Up @@ -373,7 +373,7 @@ private static String getBulkIndexDetailedFailureMessage(String prefix, Map<Stri
return failureMessage;
}

private static Exception decorateBulkIndexException(Exception irrecoverableException) {
private static Throwable decorateBulkIndexException(Throwable irrecoverableException) {
if (irrecoverableException instanceof MapperParsingException) {
return new TransformException(
"Destination index mappings are incompatible with the transform configuration.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,28 +484,30 @@ synchronized void handleFailure(Exception e) {
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
} else if (unwrappedException instanceof IndexNotFoundException
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|| unwrappedException instanceof TransformConfigReloadingException) {
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);
} else {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);

auditor.warning(
getJobId(),
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
|| unwrappedException instanceof TransformConfigReloadingException
|| unwrappedException instanceof ResourceNotFoundException
|| unwrappedException instanceof IllegalArgumentException) {
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);
lastAuditedExceptionMessage = message;
} else {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);

auditor.warning(
getJobId(),
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
);
lastAuditedExceptionMessage = message;
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.transform.utils;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
Expand Down Expand Up @@ -63,10 +64,13 @@ public static String getDetailedMessage(Throwable t) {
* @param failures a collection of bulk item responses
* @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
*/
public static Exception getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
for (BulkItemResponse failure : failures) {
if (failure.getFailure().getCause() instanceof MapperParsingException) {
return failure.getFailure().getCause();
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(failure.getFailure().getCause());
if (unwrappedThrowable instanceof MapperParsingException
|| unwrappedThrowable instanceof IllegalArgumentException
|| unwrappedThrowable instanceof ResourceNotFoundException) {
return unwrappedThrowable;
}
}

Expand Down