Skip to content

Commit 65bc2d7

Browse files
committed
[ML] only retry persistence failures when the failure is intermittent and stop retrying when analytics job is stopping (elastic#53725)
This fixes two issues: - Results persister would retry actions even if they are not intermittent. An example of an persistent failure is a doc mapping problem. - Data frame analytics would continue to retry to persist results even after the job is stopped. closes elastic#53687
1 parent 4178c57 commit 65bc2d7

File tree

4 files changed

+118
-9
lines changed

4 files changed

+118
-9
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public void awaitForCompletion() {
108108
}
109109

110110
public void cancel() {
111+
dataFrameRowsJoiner.cancel();
111112
isCancelled = true;
112113
}
113114

@@ -264,12 +265,12 @@ private void indexStatsResult(ToXContentObject result, Function<String, String>
264265
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
265266
WriteRequest.RefreshPolicy.IMMEDIATE,
266267
docIdSupplier.apply(analytics.getId()),
267-
() -> true,
268+
() -> isCancelled == false,
268269
errorMsg -> auditor.error(analytics.getId(),
269270
"failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg)
270271
);
271272
} catch (IOException ioe) {
272-
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe);
273+
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed serializing stats result", analytics.getId()), ioe);
273274
} catch (Exception e) {
274275
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e);
275276
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ class DataFrameRowsJoiner implements AutoCloseable {
4040
private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
4141
private LinkedList<RowResults> currentResults;
4242
private volatile String failure;
43+
private volatile boolean isCancelled;
4344

44-
DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor,
45-
ResultsPersisterService resultsPersisterService) {
45+
DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor, ResultsPersisterService resultsPersisterService) {
4646
this.analyticsId = Objects.requireNonNull(analyticsId);
4747
this.dataExtractor = Objects.requireNonNull(dataExtractor);
4848
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
@@ -70,6 +70,10 @@ void processRowResults(RowResults rowResults) {
7070
}
7171
}
7272

73+
void cancel() {
74+
isCancelled = true;
75+
}
76+
7377
private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
7478
currentResults.add(rowResults);
7579
if (currentResults.size() == RESULTS_BATCH_SIZE) {
@@ -87,7 +91,11 @@ private void joinCurrentResults() {
8791
}
8892
if (bulkRequest.numberOfActions() > 0) {
8993
resultsPersisterService.bulkIndexWithHeadersWithRetry(
90-
dataExtractor.getHeaders(), bulkRequest, analyticsId, () -> true, errorMsg -> {});
94+
dataExtractor.getHeaders(),
95+
bulkRequest,
96+
analyticsId,
97+
() -> isCancelled == false,
98+
errorMsg -> {});
9199
}
92100
currentResults = new LinkedList<>();
93101
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java

+51-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.logging.log4j.Logger;
1010
import org.apache.logging.log4j.message.ParameterizedMessage;
1111
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.ExceptionsHelper;
1213
import org.elasticsearch.action.bulk.BulkAction;
1314
import org.elasticsearch.action.bulk.BulkItemResponse;
1415
import org.elasticsearch.action.bulk.BulkRequest;
@@ -33,6 +34,8 @@
3334
import java.io.IOException;
3435
import java.time.Duration;
3536
import java.util.Arrays;
37+
import java.util.Collections;
38+
import java.util.HashSet;
3639
import java.util.Map;
3740
import java.util.Random;
3841
import java.util.Set;
@@ -42,6 +45,22 @@
4245
import java.util.stream.Collectors;
4346

4447
public class ResultsPersisterService {
48+
/**
49+
* List of rest statuses that we consider irrecoverable
50+
*/
51+
public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = Collections.unmodifiableSet(new HashSet<>(
52+
Arrays.asList(
53+
RestStatus.GONE,
54+
RestStatus.NOT_IMPLEMENTED,
55+
RestStatus.NOT_FOUND,
56+
RestStatus.BAD_REQUEST,
57+
RestStatus.UNAUTHORIZED,
58+
RestStatus.FORBIDDEN,
59+
RestStatus.METHOD_NOT_ALLOWED,
60+
RestStatus.NOT_ACCEPTABLE
61+
)
62+
));
63+
4564
private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class);
4665

4766
public static final Setting<Integer> PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting(
@@ -124,9 +143,23 @@ private BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest,
124143
if (bulkResponse.hasFailures() == false) {
125144
return bulkResponse;
126145
}
127-
146+
for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
147+
if (itemResponse.isFailed()) {
148+
if (isIrrecoverable(itemResponse.getFailure().getCause())) {
149+
Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause());
150+
LOGGER.warn(new ParameterizedMessage(
151+
"[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]",
152+
jobId,
153+
bulkResponse.buildFailureMessage()),
154+
unwrappedParticular);
155+
throw new ElasticsearchException(
156+
"{} experienced failure that cannot be automatically retried. See logs for bulk failures",
157+
unwrappedParticular,
158+
jobId);
159+
}
160+
}
161+
}
128162
retryContext.nextIteration("index", bulkResponse.buildFailureMessage());
129-
130163
// We should only retry the docs that failed.
131164
bulkRequest = buildNewRequestFromFailures(bulkRequest, bulkResponse);
132165
}
@@ -148,12 +181,28 @@ public SearchResponse searchWithRetry(SearchRequest searchRequest,
148181
} catch (ElasticsearchException e) {
149182
LOGGER.warn("[" + jobId + "] Exception while executing search action", e);
150183
failureMessage = e.getDetailedMessage();
184+
if (isIrrecoverable(e)) {
185+
LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e);
186+
throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId);
187+
}
151188
}
152189

153190
retryContext.nextIteration("search", failureMessage);
154191
}
155192
}
156193

194+
/**
195+
* @param ex The exception to check
196+
* @return true when the failure will persist no matter how many times we retry.
197+
*/
198+
private static boolean isIrrecoverable(Exception ex) {
199+
Throwable t = ExceptionsHelper.unwrapCause(ex);
200+
if (t instanceof ElasticsearchException) {
201+
return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status());
202+
}
203+
return false;
204+
}
205+
157206
/**
158207
* {@link RetryContext} object handles logic that is executed between consecutive retries of an action.
159208
*

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java

+53-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.utils.persistence;
77

88
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.ElasticsearchStatusException;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.DocWriteRequest;
1112
import org.elasticsearch.action.bulk.BulkAction;
@@ -28,8 +29,10 @@
2829
import org.elasticsearch.common.CheckedConsumer;
2930
import org.elasticsearch.common.settings.ClusterSettings;
3031
import org.elasticsearch.common.settings.Settings;
31-
import org.elasticsearch.index.IndexNotFoundException;
32+
import org.elasticsearch.index.Index;
3233
import org.elasticsearch.index.shard.ShardId;
34+
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
35+
import org.elasticsearch.rest.RestStatus;
3336
import org.elasticsearch.test.ESTestCase;
3437
import org.elasticsearch.threadpool.ThreadPool;
3538
import org.elasticsearch.xpack.core.ClientHelper;
@@ -133,7 +136,8 @@ public void testSearchWithRetries_SuccessAfterRetry() {
133136
}
134137

135138
public void testSearchWithRetries_SuccessAfterRetryDueToException() {
136-
doThrow(new IndexNotFoundException("my-index")).doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
139+
doThrow(new IndexPrimaryShardNotAllocatedException(new Index("my-index", "UUID")))
140+
.doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
137141
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
138142

139143
List<String> messages = new ArrayList<>();
@@ -208,6 +212,21 @@ public void testSearchWithRetries_Failure_ShouldNotRetryAfterRandomNumberOfRetri
208212
verify(client, times(maxRetries + 1)).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
209213
}
210214

215+
public void testSearchWithRetries_FailureOnIrrecoverableError() {
216+
resultsPersisterService.setMaxFailureRetries(5);
217+
218+
doAnswer(withFailure(new ElasticsearchStatusException("bad search request", RestStatus.BAD_REQUEST)))
219+
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
220+
221+
ElasticsearchException e =
222+
expectThrows(
223+
ElasticsearchException.class,
224+
() -> resultsPersisterService.searchWithRetry(SEARCH_REQUEST, JOB_ID, () -> true, (s) -> {}));
225+
assertThat(e.getMessage(), containsString("experienced failure that cannot be automatically retried"));
226+
227+
verify(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
228+
}
229+
211230
private static Supplier<Boolean> shouldRetryUntil(int maxRetries) {
212231
return new Supplier<Boolean>() {
213232
int retries = 0;
@@ -242,6 +261,29 @@ public void testBulkRequestChangeOnFailures() {
242261
assertThat(lastMessage.get(), containsString("failed to index after [1] attempts. Will attempt again in"));
243262
}
244263

264+
public void testBulkRequestChangeOnIrrecoverableFailures() {
265+
int maxFailureRetries = 10;
266+
resultsPersisterService.setMaxFailureRetries(maxFailureRetries);
267+
BulkItemResponse irrecoverable = new BulkItemResponse(
268+
2,
269+
DocWriteRequest.OpType.INDEX,
270+
new BulkItemResponse.Failure("my-index", "fail", new ElasticsearchStatusException("boom", RestStatus.BAD_REQUEST)));
271+
doAnswerWithResponses(
272+
new BulkResponse(new BulkItemResponse[]{irrecoverable, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
273+
new BulkResponse(new BulkItemResponse[0], 0L))
274+
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());
275+
276+
BulkRequest bulkRequest = new BulkRequest();
277+
bulkRequest.add(INDEX_REQUEST_FAILURE);
278+
bulkRequest.add(INDEX_REQUEST_SUCCESS);
279+
280+
ElasticsearchException ex = expectThrows(ElasticsearchException.class,
281+
() -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, (s)->{}));
282+
283+
verify(client).execute(eq(BulkAction.INSTANCE), any(), any());
284+
assertThat(ex.getMessage(), containsString("experienced failure that cannot be automatically retried."));
285+
}
286+
245287
public void testBulkRequestDoesNotRetryWhenSupplierIsFalse() {
246288
doAnswerWithResponses(
247289
new BulkResponse(new BulkItemResponse[]{BULK_ITEM_RESPONSE_FAILURE, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
@@ -317,6 +359,15 @@ private static <Response> Answer<Response> withResponse(Response response) {
317359
};
318360
}
319361

362+
@SuppressWarnings("unchecked")
363+
private static <Response> Answer<Response> withFailure(Exception failure) {
364+
return invocationOnMock -> {
365+
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
366+
listener.onFailure(failure);
367+
return null;
368+
};
369+
}
370+
320371
private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) {
321372
CheckedConsumer<Integer, InterruptedException> sleeper = millis -> {};
322373
ThreadPool tp = mock(ThreadPool.class);

0 commit comments

Comments
 (0)