Skip to content

Commit d7ffa7f

Browse files
authored
[7.x][ML] Add graceful retry for anomaly detector result indexing failures(#49508) (#50145)
* [ML] Add graceful retry for anomaly detector result indexing failures (#49508) All results indexing now retry the amount of times configured in `xpack.ml.persist_results_max_retries`. The retries are done in a semi-random, exponential backoff. * fixing test
1 parent f42537a commit d7ffa7f

21 files changed

+972
-223
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.apache.logging.log4j.Logger;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.bulk.BulkItemResponse;
11+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
12+
import org.elasticsearch.action.bulk.BulkResponse;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.support.WriteRequest;
15+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
16+
import org.elasticsearch.cluster.metadata.IndexMetaData;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.unit.TimeValue;
19+
import org.elasticsearch.xpack.core.action.util.PageParams;
20+
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
21+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
22+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
23+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
24+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
25+
import org.elasticsearch.xpack.core.ml.job.config.Job;
26+
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
27+
import org.elasticsearch.xpack.core.ml.job.results.Result;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
31+
import java.time.Duration;
32+
import java.util.Collections;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.atomic.AtomicReference;
35+
import java.util.function.Consumer;
36+
37+
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
38+
import static org.hamcrest.Matchers.greaterThan;
39+
40+
public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {
41+
42+
private final String index = "bulk-failure-retry";
43+
private long now = System.currentTimeMillis();
44+
private static long DAY = Duration.ofDays(1).toMillis();
45+
private final String jobId = "bulk-failure-retry-job";
46+
private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";
47+
48+
@Before
49+
public void putPastDataIntoIndex() {
50+
client().admin().indices().prepareCreate(index)
51+
.addMapping("type", "time", "type=date", "value", "type=long")
52+
.get();
53+
long twoDaysAgo = now - DAY * 2;
54+
long threeDaysAgo = now - DAY * 3;
55+
writeData(logger, index, 250, threeDaysAgo, twoDaysAgo);
56+
}
57+
58+
@After
59+
public void cleanUpTest() {
60+
client().admin()
61+
.cluster()
62+
.prepareUpdateSettings()
63+
.setTransientSettings(Settings.builder()
64+
.putNull("xpack.ml.persist_results_max_retries")
65+
.putNull("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob")
66+
.putNull("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister")
67+
.putNull("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output")
68+
.build()).get();
69+
cleanUp();
70+
}
71+
72+
private void ensureAnomaliesWrite() throws InterruptedException {
73+
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
74+
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
75+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
76+
blockingCall(
77+
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
78+
acknowledgedResponseHolder,
79+
exceptionHolder);
80+
if (exceptionHolder.get() != null) {
81+
fail("FAILED TO MARK ["+ resultsIndex + "] as read-write again" + exceptionHolder.get());
82+
}
83+
}
84+
85+
private void setAnomaliesReadOnlyBlock() throws InterruptedException {
86+
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
87+
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
88+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
89+
blockingCall(
90+
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
91+
acknowledgedResponseHolder,
92+
exceptionHolder);
93+
if (exceptionHolder.get() != null) {
94+
fail("FAILED TO MARK ["+ resultsIndex + "] as read-ONLY: " + exceptionHolder.get());
95+
}
96+
}
97+
98+
public void testBulkFailureRetries() throws Exception {
99+
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
100+
job.setResultsIndexName(jobId);
101+
102+
DatafeedConfig.Builder datafeedConfigBuilder =
103+
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
104+
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
105+
registerJob(job);
106+
putJob(job);
107+
openJob(job.getId());
108+
registerDatafeed(datafeedConfig);
109+
putDatafeed(datafeedConfig);
110+
long twoDaysAgo = now - 2 * DAY;
111+
startDatafeed(datafeedConfig.getId(), 0L, twoDaysAgo);
112+
waitUntilJobIsClosed(jobId);
113+
114+
// Get the job stats
115+
Bucket initialLatestBucket = getLatestFinalizedBucket(jobId);
116+
assertThat(initialLatestBucket.getEpoch(), greaterThan(0L));
117+
118+
client().admin()
119+
.cluster()
120+
.prepareUpdateSettings()
121+
.setTransientSettings(Settings.builder()
122+
.put("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob", "TRACE")
123+
.put("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister", "TRACE")
124+
.put("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output", "TRACE")
125+
.put("xpack.ml.persist_results_max_retries", "15")
126+
.build()).get();
127+
128+
setAnomaliesReadOnlyBlock();
129+
130+
int moreDocs = 1_000;
131+
writeData(logger, index, moreDocs, twoDaysAgo, now);
132+
133+
openJob(job.getId());
134+
startDatafeed(datafeedConfig.getId(), twoDaysAgo, now);
135+
136+
ensureAnomaliesWrite();
137+
waitUntilJobIsClosed(jobId);
138+
139+
Bucket newLatestBucket = getLatestFinalizedBucket(jobId);
140+
assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch()));
141+
}
142+
143+
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
144+
return createJob(id, bucketSpan, function, field, null);
145+
}
146+
147+
private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
148+
DataDescription.Builder dataDescription = new DataDescription.Builder();
149+
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
150+
dataDescription.setTimeField("time");
151+
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
152+
153+
Detector.Builder d = new Detector.Builder(function, field);
154+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()))
155+
.setBucketSpan(bucketSpan)
156+
.setSummaryCountFieldName(summaryCountField);
157+
158+
return new Job.Builder().setId(id).setAnalysisConfig(analysisConfig).setDataDescription(dataDescription);
159+
}
160+
161+
private void writeData(Logger logger, String index, long numDocs, long start, long end) {
162+
int maxDelta = (int) (end - start - 1);
163+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
164+
for (int i = 0; i < numDocs; i++) {
165+
IndexRequest indexRequest = new IndexRequest(index);
166+
long timestamp = start + randomIntBetween(0, maxDelta);
167+
assert timestamp >= start && timestamp < end;
168+
indexRequest.source("time", timestamp, "value", i);
169+
bulkRequestBuilder.add(indexRequest);
170+
}
171+
BulkResponse bulkResponse = bulkRequestBuilder
172+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
173+
.get();
174+
if (bulkResponse.hasFailures()) {
175+
int failures = 0;
176+
for (BulkItemResponse itemResponse : bulkResponse) {
177+
if (itemResponse.isFailed()) {
178+
failures++;
179+
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
180+
}
181+
}
182+
fail("Bulk response contained " + failures + " failures");
183+
}
184+
logger.info("Indexed [{}] documents", numDocs);
185+
}
186+
187+
private Bucket getLatestFinalizedBucket(String jobId) {
188+
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
189+
getBucketsRequest.setExcludeInterim(true);
190+
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
191+
getBucketsRequest.setDescending(true);
192+
getBucketsRequest.setPageParams(new PageParams(0, 1));
193+
return getBuckets(getBucketsRequest).get(0);
194+
}
195+
196+
private <T> void blockingCall(Consumer<ActionListener<T>> function,
197+
AtomicReference<T> response,
198+
AtomicReference<Exception> error) throws InterruptedException {
199+
CountDownLatch latch = new CountDownLatch(1);
200+
ActionListener<T> listener = ActionListener.wrap(
201+
r -> {
202+
response.set(r);
203+
latch.countDown();
204+
},
205+
e -> {
206+
error.set(e);
207+
latch.countDown();
208+
}
209+
);
210+
211+
function.accept(listener);
212+
latch.await();
213+
}
214+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@
295295
import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
296296
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
297297
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
298+
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
298299

299300
import java.io.IOException;
300301
import java.math.BigInteger;
@@ -445,7 +446,8 @@ public List<Setting<?>> getSettings() {
445446
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION,
446447
InferenceProcessor.MAX_INFERENCE_PROCESSORS,
447448
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
448-
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL));
449+
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
450+
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES));
449451
}
450452

451453
public Settings additionalSettings() {
@@ -518,9 +520,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
518520
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
519521
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName());
520522
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
523+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(client, clusterService, settings);
521524
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
522-
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
523-
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
525+
JobResultsPersister jobResultsPersister = new JobResultsPersister(client, resultsPersisterService, anomalyDetectionAuditor);
526+
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client,
527+
resultsPersisterService,
528+
anomalyDetectionAuditor);
524529
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
525530
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
526531
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ private ActionListener<RevertModelSnapshotAction.Response> wrapRevertDataCountsL
170170
return ActionListener.wrap(response -> {
171171
jobResultsProvider.dataCounts(jobId, counts -> {
172172
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
173-
jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
173+
jobDataCountsPersister.persistDataCountsAsync(jobId, counts, new ActionListener<Boolean>() {
174174
@Override
175175
public void onResponse(Boolean aBoolean) {
176176
listener.onResponse(response);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java

+42-7
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,19 @@
88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
1010
import org.apache.logging.log4j.message.ParameterizedMessage;
11-
import org.apache.logging.log4j.util.Supplier;
1211
import org.elasticsearch.action.ActionListener;
1312
import org.elasticsearch.action.index.IndexAction;
1413
import org.elasticsearch.action.index.IndexRequest;
1514
import org.elasticsearch.action.index.IndexResponse;
15+
import org.elasticsearch.action.support.WriteRequest;
1616
import org.elasticsearch.client.Client;
1717
import org.elasticsearch.common.xcontent.ToXContent;
1818
import org.elasticsearch.common.xcontent.XContentBuilder;
1919
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
2020
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
21+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
22+
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
23+
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
2124

2225
import java.io.IOException;
2326

@@ -33,29 +36,59 @@ public class JobDataCountsPersister {
3336

3437
private static final Logger logger = LogManager.getLogger(JobDataCountsPersister.class);
3538

39+
private final ResultsPersisterService resultsPersisterService;
3640
private final Client client;
41+
private final AnomalyDetectionAuditor auditor;
3742

38-
public JobDataCountsPersister(Client client) {
43+
public JobDataCountsPersister(Client client, ResultsPersisterService resultsPersisterService, AnomalyDetectionAuditor auditor) {
44+
this.resultsPersisterService = resultsPersisterService;
3945
this.client = client;
46+
this.auditor = auditor;
4047
}
4148

42-
private XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
49+
private static XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
4350
XContentBuilder builder = jsonBuilder();
4451
return counts.toXContent(builder, ToXContent.EMPTY_PARAMS);
4552
}
4653

4754
/**
4855
* Update the job's data counts stats and figures.
56+
* NOTE: This call is synchronous and pauses the calling thread.
57+
* @param jobId Job to update
58+
* @param counts The counts
59+
*/
60+
public void persistDataCounts(String jobId, DataCounts counts) {
61+
try {
62+
resultsPersisterService.indexWithRetry(jobId,
63+
AnomalyDetectorsIndex.resultsWriteAlias(jobId),
64+
counts,
65+
ToXContent.EMPTY_PARAMS,
66+
WriteRequest.RefreshPolicy.NONE,
67+
DataCounts.documentId(jobId),
68+
() -> true,
69+
(msg) -> auditor.warning(jobId, "Job data_counts " + msg));
70+
} catch (IOException ioe) {
71+
logger.error(() -> new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId), ioe);
72+
} catch (Exception ex) {
73+
logger.error(() -> new ParameterizedMessage("[{}] Failed persisting data_counts stats", jobId), ex);
74+
}
75+
}
76+
77+
/**
78+
* The same as {@link JobDataCountsPersister#persistDataCounts(String, DataCounts)} but done Asynchronously.
4979
*
80+
* Two differences are:
81+
* - The listener is notified on persistence failure
82+
* - If the persistence fails, it is not automatically retried
5083
* @param jobId Job to update
5184
* @param counts The counts
5285
* @param listener Action response listener
5386
*/
54-
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
87+
public void persistDataCountsAsync(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
5588
try (XContentBuilder content = serialiseCounts(counts)) {
5689
final IndexRequest request = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId))
57-
.id(DataCounts.documentId(jobId))
58-
.source(content);
90+
.id(DataCounts.documentId(jobId))
91+
.source(content);
5992
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener<IndexResponse>() {
6093
@Override
6194
public void onResponse(IndexResponse indexResponse) {
@@ -68,7 +101,9 @@ public void onFailure(Exception e) {
68101
}
69102
});
70103
} catch (IOException ioe) {
71-
logger.warn((Supplier<?>)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe);
104+
String msg = new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId).getFormattedMessage();
105+
logger.error(msg, ioe);
106+
listener.onFailure(ExceptionsHelper.serverError(msg, ioe));
72107
}
73108
}
74109
}

0 commit comments

Comments
 (0)