Skip to content

[ML] only retry persistence failures when the failure is intermittent and stop retrying when analytics job is stopping #53725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public AnalyticsResultProcessor(DataFrameAnalyticsConfig analytics, DataFrameRow
DataFrameAnalyticsAuditor auditor, ResultsPersisterService resultsPersisterService,
List<ExtractedField> fieldNames) {
this.analytics = Objects.requireNonNull(analytics);
this.dataFrameRowsJoiner = Objects.requireNonNull(dataFrameRowsJoiner);
this.dataFrameRowsJoiner = Objects.requireNonNull(dataFrameRowsJoiner).setShouldRetryPersistence(() -> isCancelled == false);
this.statsHolder = Objects.requireNonNull(statsHolder);
this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider);
this.auditor = Objects.requireNonNull(auditor);
Expand Down Expand Up @@ -265,12 +265,12 @@ private void indexStatsResult(ToXContentObject result, Function<String, String>
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
WriteRequest.RefreshPolicy.IMMEDIATE,
docIdSupplier.apply(analytics.getId()),
() -> true,
() -> isCancelled == false,
errorMsg -> auditor.error(analytics.getId(),
"failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg)
);
} catch (IOException ioe) {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe);
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed serializing stats result", analytics.getId()), ioe);
} catch (Exception e) {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

class DataFrameRowsJoiner implements AutoCloseable {

Expand All @@ -37,12 +38,12 @@ class DataFrameRowsJoiner implements AutoCloseable {
private final String analyticsId;
private final DataFrameDataExtractor dataExtractor;
private final ResultsPersisterService resultsPersisterService;
private Supplier<Boolean> shouldRetryPersistence = () -> true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be final and passed in the constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the retry persistence logic depends on information stored within AnalyticsResultProcessor we would either have to make a Builder or a Factory class that we then pass to AnalyticsResultProcessor which constructs a new joiner from that passed factory.

This option just seemed simpler and less complex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, got it. Please also consider the following approach which might be even less complex:

Add isCancelled field and cancel() method to DataFrameRowsJoiner class. In AnalyticsResultProcessor::cancel(), you can also cancel the joiner:

    public void cancel() {
        isCancelled = true;
        dataFrameRowsJoiner.cancel();
    }

Then there is no need to pass the Supplier as you've already passed the actual isCancelled value.

private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
private LinkedList<RowResults> currentResults;
private volatile String failure;

DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor,
ResultsPersisterService resultsPersisterService) {
DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor, ResultsPersisterService resultsPersisterService) {
this.analyticsId = Objects.requireNonNull(analyticsId);
this.dataExtractor = Objects.requireNonNull(dataExtractor);
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
Expand Down Expand Up @@ -70,6 +71,11 @@ void processRowResults(RowResults rowResults) {
}
}

DataFrameRowsJoiner setShouldRetryPersistence(Supplier<Boolean> shouldRetryPersistence) {
this.shouldRetryPersistence = shouldRetryPersistence;
return this;
}

private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
currentResults.add(rowResults);
if (currentResults.size() == RESULTS_BATCH_SIZE) {
Expand All @@ -87,7 +93,11 @@ private void joinCurrentResults() {
}
if (bulkRequest.numberOfActions() > 0) {
resultsPersisterService.bulkIndexWithHeadersWithRetry(
dataExtractor.getHeaders(), bulkRequest, analyticsId, () -> true, errorMsg -> {});
dataExtractor.getHeaders(),
bulkRequest,
analyticsId,
shouldRetryPersistence,
errorMsg -> {});
}
currentResults = new LinkedList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -33,6 +34,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
Expand All @@ -42,6 +44,22 @@
import java.util.stream.Collectors;

public class ResultsPersisterService {
/**
* List of rest statuses that we consider irrecoverable
*/
public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = new HashSet<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since the name is in capitals it should be immutable, i.e. Set.of() or Collections.unmodifiableSet().

Arrays.asList(
RestStatus.GONE,
RestStatus.NOT_IMPLEMENTED,
RestStatus.NOT_FOUND,
RestStatus.BAD_REQUEST,
RestStatus.UNAUTHORIZED,
RestStatus.FORBIDDEN,
RestStatus.METHOD_NOT_ALLOWED,
RestStatus.NOT_ACCEPTABLE
)
);

private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class);

public static final Setting<Integer> PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting(
Expand Down Expand Up @@ -124,9 +142,23 @@ private BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest,
if (bulkResponse.hasFailures() == false) {
return bulkResponse;
}

for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
if (itemResponse.isFailed()) {
if (isIrrecoverable(itemResponse.getFailure().getCause())) {
Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause());
LOGGER.warn(new ParameterizedMessage(
"[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]",
jobId,
bulkResponse.buildFailureMessage()),
unwrappedParticular);
throw new ElasticsearchException(
"{} experienced failure that cannot be automatically retried. See logs for bulk failures",
unwrappedParticular,
jobId);
}
}
}
retryContext.nextIteration("index", bulkResponse.buildFailureMessage());

// We should only retry the docs that failed.
bulkRequest = buildNewRequestFromFailures(bulkRequest, bulkResponse);
}
Expand All @@ -148,12 +180,28 @@ public SearchResponse searchWithRetry(SearchRequest searchRequest,
} catch (ElasticsearchException e) {
LOGGER.warn("[" + jobId + "] Exception while executing search action", e);
failureMessage = e.getDetailedMessage();
if (isIrrecoverable(e)) {
LOGGER.warn(new ParameterizedMessage("[{}] experienced irrecoverable failure", jobId), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this message consistent with others? "...failure that cannot be..."

throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId);
}
}

retryContext.nextIteration("search", failureMessage);
}
}

/**
* @param ex The exception to check
* @return true when the failure will persist no matter how many times we retry.
*/
static boolean isIrrecoverable(Exception ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "private"?

Throwable t = ExceptionsHelper.unwrapCause(ex);
if (t instanceof ElasticsearchException) {
return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status());
}
return false;
}

/**
* {@link RetryContext} object handles logic that is executed between consecutive retries of an action.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
public void setUpMocks() {
process = mock(AnalyticsProcess.class);
dataFrameRowsJoiner = mock(DataFrameRowsJoiner.class);
when(dataFrameRowsJoiner.setShouldRetryPersistence(any())).thenReturn(dataFrameRowsJoiner);
trainedModelProvider = mock(TrainedModelProvider.class);
auditor = mock(DataFrameAnalyticsAuditor.class);
resultsPersisterService = mock(ResultsPersisterService.class);
Expand All @@ -94,6 +95,7 @@ public void testProcess_GivenNoResults() {
resultProcessor.process(process);
resultProcessor.awaitForCompletion();

verify(dataFrameRowsJoiner).setShouldRetryPersistence(any());
verify(dataFrameRowsJoiner).close();
verifyNoMoreInteractions(dataFrameRowsJoiner);
}
Expand All @@ -106,6 +108,7 @@ public void testProcess_GivenEmptyResults() {
resultProcessor.process(process);
resultProcessor.awaitForCompletion();

verify(dataFrameRowsJoiner).setShouldRetryPersistence(any());
verify(dataFrameRowsJoiner).close();
Mockito.verifyNoMoreInteractions(dataFrameRowsJoiner);
assertThat(statsHolder.getProgressTracker().writingResultsPercent.get(), equalTo(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.utils.persistence;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
Expand All @@ -27,8 +28,10 @@
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
Expand Down Expand Up @@ -131,7 +134,8 @@ public void testSearchWithRetries_SuccessAfterRetry() {
}

public void testSearchWithRetries_SuccessAfterRetryDueToException() {
doThrow(new IndexNotFoundException("my-index")).doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
doThrow(new IndexPrimaryShardNotAllocatedException(new Index("my-index", "UUID")))
.doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());

List<String> messages = new ArrayList<>();
Expand Down Expand Up @@ -206,6 +210,21 @@ public void testSearchWithRetries_Failure_ShouldNotRetryAfterRandomNumberOfRetri
verify(client, times(maxRetries + 1)).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
}

public void testSearchWithRetries_FailureOnIrrecoverableError() {
resultsPersisterService.setMaxFailureRetries(5);

doAnswer(withFailure(new ElasticsearchStatusException("bad search request", RestStatus.BAD_REQUEST)))
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());

ElasticsearchException e =
expectThrows(
ElasticsearchException.class,
() -> resultsPersisterService.searchWithRetry(SEARCH_REQUEST, JOB_ID, () -> true, (s) -> {}));
assertThat(e.getMessage(), containsString("experienced failure that cannot be automatically retried"));

verify(client, times(1)).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think times(1) is default and you can drop it.

}

private static Supplier<Boolean> shouldRetryUntil(int maxRetries) {
return new Supplier<>() {
int retries = 0;
Expand Down Expand Up @@ -240,6 +259,29 @@ public void testBulkRequestChangeOnFailures() {
assertThat(lastMessage.get(), containsString("failed to index after [1] attempts. Will attempt again in"));
}

public void testBulkRequestChangeOnIrrecoverableFailures() {
int maxFailureRetries = 10;
resultsPersisterService.setMaxFailureRetries(maxFailureRetries);
BulkItemResponse irrecoverable = new BulkItemResponse(
2,
DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("my-index", "fail", new ElasticsearchStatusException("boom", RestStatus.BAD_REQUEST)));
doAnswerWithResponses(
new BulkResponse(new BulkItemResponse[]{irrecoverable, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
new BulkResponse(new BulkItemResponse[0], 0L))
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(INDEX_REQUEST_FAILURE);
bulkRequest.add(INDEX_REQUEST_SUCCESS);

ElasticsearchException ex = expectThrows(ElasticsearchException.class,
() -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, (s)->{}));

verify(client, times(1)).execute(eq(BulkAction.INSTANCE), any(), any());
assertThat(ex.getMessage(), containsString("experienced failure that cannot be automatically retried."));
}

public void testBulkRequestDoesNotRetryWhenSupplierIsFalse() {
doAnswerWithResponses(
new BulkResponse(new BulkItemResponse[]{BULK_ITEM_RESPONSE_FAILURE, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
Expand Down Expand Up @@ -315,6 +357,15 @@ private static <Response> Answer<Response> withResponse(Response response) {
};
}

@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withFailure(Exception failure) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onFailure(failure);
return null;
};
}

private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) {
CheckedConsumer<Integer, InterruptedException> sleeper = millis -> {};
ThreadPool tp = mock(ThreadPool.class);
Expand Down