Skip to content

[7.x] [ML] only retry persistence failures when the failure is intermittent and stop retrying when analytics job is stopping (#53725) #53808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void awaitForCompletion() {
}

public void cancel() {
dataFrameRowsJoiner.cancel();
isCancelled = true;
}

Expand Down Expand Up @@ -264,12 +265,12 @@ private void indexStatsResult(ToXContentObject result, Function<String, String>
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
WriteRequest.RefreshPolicy.IMMEDIATE,
docIdSupplier.apply(analytics.getId()),
() -> true,
() -> isCancelled == false,
errorMsg -> auditor.error(analytics.getId(),
"failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg)
);
} catch (IOException ioe) {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe);
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed serializing stats result", analytics.getId()), ioe);
} catch (Exception e) {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ class DataFrameRowsJoiner implements AutoCloseable {
private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
private LinkedList<RowResults> currentResults;
private volatile String failure;
private volatile boolean isCancelled;

DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor,
ResultsPersisterService resultsPersisterService) {
DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor, ResultsPersisterService resultsPersisterService) {
this.analyticsId = Objects.requireNonNull(analyticsId);
this.dataExtractor = Objects.requireNonNull(dataExtractor);
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
Expand Down Expand Up @@ -70,6 +70,10 @@ void processRowResults(RowResults rowResults) {
}
}

void cancel() {
isCancelled = true;
}

private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
currentResults.add(rowResults);
if (currentResults.size() == RESULTS_BATCH_SIZE) {
Expand All @@ -87,7 +91,11 @@ private void joinCurrentResults() {
}
if (bulkRequest.numberOfActions() > 0) {
resultsPersisterService.bulkIndexWithHeadersWithRetry(
dataExtractor.getHeaders(), bulkRequest, analyticsId, () -> true, errorMsg -> {});
dataExtractor.getHeaders(),
bulkRequest,
analyticsId,
() -> isCancelled == false,
errorMsg -> {});
}
currentResults = new LinkedList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -33,6 +34,8 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
Expand All @@ -42,6 +45,22 @@
import java.util.stream.Collectors;

public class ResultsPersisterService {
/**
* List of rest statuses that we consider irrecoverable
*/
public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = Collections.unmodifiableSet(new HashSet<>(
Arrays.asList(
RestStatus.GONE,
RestStatus.NOT_IMPLEMENTED,
RestStatus.NOT_FOUND,
RestStatus.BAD_REQUEST,
RestStatus.UNAUTHORIZED,
RestStatus.FORBIDDEN,
RestStatus.METHOD_NOT_ALLOWED,
RestStatus.NOT_ACCEPTABLE
)
));

private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class);

public static final Setting<Integer> PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting(
Expand Down Expand Up @@ -124,9 +143,23 @@ private BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest,
if (bulkResponse.hasFailures() == false) {
return bulkResponse;
}

for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
if (itemResponse.isFailed()) {
if (isIrrecoverable(itemResponse.getFailure().getCause())) {
Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause());
LOGGER.warn(new ParameterizedMessage(
"[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]",
jobId,
bulkResponse.buildFailureMessage()),
unwrappedParticular);
throw new ElasticsearchException(
"{} experienced failure that cannot be automatically retried. See logs for bulk failures",
unwrappedParticular,
jobId);
}
}
}
retryContext.nextIteration("index", bulkResponse.buildFailureMessage());

// We should only retry the docs that failed.
bulkRequest = buildNewRequestFromFailures(bulkRequest, bulkResponse);
}
Expand All @@ -148,12 +181,28 @@ public SearchResponse searchWithRetry(SearchRequest searchRequest,
} catch (ElasticsearchException e) {
LOGGER.warn("[" + jobId + "] Exception while executing search action", e);
failureMessage = e.getDetailedMessage();
if (isIrrecoverable(e)) {
LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e);
throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId);
}
}

retryContext.nextIteration("search", failureMessage);
}
}

/**
* @param ex The exception to check
* @return true when the failure will persist no matter how many times we retry.
*/
private static boolean isIrrecoverable(Exception ex) {
Throwable t = ExceptionsHelper.unwrapCause(ex);
if (t instanceof ElasticsearchException) {
return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status());
}
return false;
}

/**
* {@link RetryContext} object handles logic that is executed between consecutive retries of an action.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.utils.persistence;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
Expand All @@ -28,8 +29,10 @@
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
Expand Down Expand Up @@ -133,7 +136,8 @@ public void testSearchWithRetries_SuccessAfterRetry() {
}

public void testSearchWithRetries_SuccessAfterRetryDueToException() {
doThrow(new IndexNotFoundException("my-index")).doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
doThrow(new IndexPrimaryShardNotAllocatedException(new Index("my-index", "UUID")))
.doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());

List<String> messages = new ArrayList<>();
Expand Down Expand Up @@ -208,6 +212,21 @@ public void testSearchWithRetries_Failure_ShouldNotRetryAfterRandomNumberOfRetri
verify(client, times(maxRetries + 1)).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
}

public void testSearchWithRetries_FailureOnIrrecoverableError() {
resultsPersisterService.setMaxFailureRetries(5);

doAnswer(withFailure(new ElasticsearchStatusException("bad search request", RestStatus.BAD_REQUEST)))
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());

ElasticsearchException e =
expectThrows(
ElasticsearchException.class,
() -> resultsPersisterService.searchWithRetry(SEARCH_REQUEST, JOB_ID, () -> true, (s) -> {}));
assertThat(e.getMessage(), containsString("experienced failure that cannot be automatically retried"));

verify(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
}

private static Supplier<Boolean> shouldRetryUntil(int maxRetries) {
return new Supplier<Boolean>() {
int retries = 0;
Expand Down Expand Up @@ -242,6 +261,29 @@ public void testBulkRequestChangeOnFailures() {
assertThat(lastMessage.get(), containsString("failed to index after [1] attempts. Will attempt again in"));
}

public void testBulkRequestChangeOnIrrecoverableFailures() {
int maxFailureRetries = 10;
resultsPersisterService.setMaxFailureRetries(maxFailureRetries);
BulkItemResponse irrecoverable = new BulkItemResponse(
2,
DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("my-index", "_doc", "fail", new ElasticsearchStatusException("boom", RestStatus.BAD_REQUEST)));
doAnswerWithResponses(
new BulkResponse(new BulkItemResponse[]{irrecoverable, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
new BulkResponse(new BulkItemResponse[0], 0L))
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(INDEX_REQUEST_FAILURE);
bulkRequest.add(INDEX_REQUEST_SUCCESS);

ElasticsearchException ex = expectThrows(ElasticsearchException.class,
() -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, (s)->{}));

verify(client).execute(eq(BulkAction.INSTANCE), any(), any());
assertThat(ex.getMessage(), containsString("experienced failure that cannot be automatically retried."));
}

public void testBulkRequestDoesNotRetryWhenSupplierIsFalse() {
doAnswerWithResponses(
new BulkResponse(new BulkItemResponse[]{BULK_ITEM_RESPONSE_FAILURE, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
Expand Down Expand Up @@ -317,6 +359,15 @@ private static <Response> Answer<Response> withResponse(Response response) {
};
}

@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withFailure(Exception failure) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onFailure(failure);
return null;
};
}

private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) {
CheckedConsumer<Integer, InterruptedException> sleeper = millis -> {};
ThreadPool tp = mock(ThreadPool.class);
Expand Down