Skip to content

[7.x][ML] Add graceful retry for anomaly detector result indexing failures(#49508) #50145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.junit.After;
import org.junit.Before;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.hamcrest.Matchers.greaterThan;

public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {

private final String index = "bulk-failure-retry";
private long now = System.currentTimeMillis();
private static long DAY = Duration.ofDays(1).toMillis();
private final String jobId = "bulk-failure-retry-job";
private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";

@Before
public void putPastDataIntoIndex() {
client().admin().indices().prepareCreate(index)
.addMapping("type", "time", "type=date", "value", "type=long")
.get();
long twoDaysAgo = now - DAY * 2;
long threeDaysAgo = now - DAY * 3;
writeData(logger, index, 250, threeDaysAgo, twoDaysAgo);
}

@After
public void cleanUpTest() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull("xpack.ml.persist_results_max_retries")
.putNull("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob")
.putNull("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister")
.putNull("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output")
.build()).get();
cleanUp();
}

private void ensureAnomaliesWrite() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
fail("FAILED TO MARK ["+ resultsIndex + "] as read-write again" + exceptionHolder.get());
}
}

private void setAnomaliesReadOnlyBlock() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
fail("FAILED TO MARK ["+ resultsIndex + "] as read-ONLY: " + exceptionHolder.get());
}
}

public void testBulkFailureRetries() throws Exception {
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
job.setResultsIndexName(jobId);

DatafeedConfig.Builder datafeedConfigBuilder =
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerJob(job);
putJob(job);
openJob(job.getId());
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
long twoDaysAgo = now - 2 * DAY;
startDatafeed(datafeedConfig.getId(), 0L, twoDaysAgo);
waitUntilJobIsClosed(jobId);

// Get the job stats
Bucket initialLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(initialLatestBucket.getEpoch(), greaterThan(0L));

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob", "TRACE")
.put("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister", "TRACE")
.put("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output", "TRACE")
.put("xpack.ml.persist_results_max_retries", "15")
.build()).get();

setAnomaliesReadOnlyBlock();

int moreDocs = 1_000;
writeData(logger, index, moreDocs, twoDaysAgo, now);

openJob(job.getId());
startDatafeed(datafeedConfig.getId(), twoDaysAgo, now);

ensureAnomaliesWrite();
waitUntilJobIsClosed(jobId);

Bucket newLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch()));
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
return createJob(id, bucketSpan, function, field, null);
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeField("time");
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);

Detector.Builder d = new Detector.Builder(function, field);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()))
.setBucketSpan(bucketSpan)
.setSummaryCountFieldName(summaryCountField);

return new Job.Builder().setId(id).setAnalysisConfig(analysisConfig).setDataDescription(dataDescription);
}

private void writeData(Logger logger, String index, long numDocs, long start, long end) {
int maxDelta = (int) (end - start - 1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index);
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp, "value", i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
if (bulkResponse.hasFailures()) {
int failures = 0;
for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
}
}
fail("Bulk response contained " + failures + " failures");
}
logger.info("Indexed [{}] documents", numDocs);
}

private Bucket getLatestFinalizedBucket(String jobId) {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
getBucketsRequest.setExcludeInterim(true);
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
getBucketsRequest.setDescending(true);
getBucketsRequest.setPageParams(new PageParams(0, 1));
return getBuckets(getBucketsRequest).get(0);
}

private <T> void blockingCall(Consumer<ActionListener<T>> function,
AtomicReference<T> response,
AtomicReference<Exception> error) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ActionListener<T> listener = ActionListener.wrap(
r -> {
response.set(r);
latch.countDown();
},
e -> {
error.set(e);
latch.countDown();
}
);

function.accept(listener);
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@
import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

import java.io.IOException;
import java.math.BigInteger;
Expand Down Expand Up @@ -445,7 +446,8 @@ public List<Setting<?>> getSettings() {
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION,
InferenceProcessor.MAX_INFERENCE_PROCESSORS,
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL));
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES));
}

public Settings additionalSettings() {
Expand Down Expand Up @@ -518,9 +520,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName());
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(client, clusterService, settings);
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client, resultsPersisterService, anomalyDetectionAuditor);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client,
resultsPersisterService,
anomalyDetectionAuditor);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private ActionListener<RevertModelSnapshotAction.Response> wrapRevertDataCountsL
return ActionListener.wrap(response -> {
jobResultsProvider.dataCounts(jobId, counts -> {
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
jobDataCountsPersister.persistDataCountsAsync(jobId, counts, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

import java.io.IOException;

Expand All @@ -33,29 +36,59 @@ public class JobDataCountsPersister {

private static final Logger logger = LogManager.getLogger(JobDataCountsPersister.class);

private final ResultsPersisterService resultsPersisterService;
private final Client client;
private final AnomalyDetectionAuditor auditor;

public JobDataCountsPersister(Client client) {
public JobDataCountsPersister(Client client, ResultsPersisterService resultsPersisterService, AnomalyDetectionAuditor auditor) {
this.resultsPersisterService = resultsPersisterService;
this.client = client;
this.auditor = auditor;
}

private XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
private static XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
XContentBuilder builder = jsonBuilder();
return counts.toXContent(builder, ToXContent.EMPTY_PARAMS);
}

/**
* Update the job's data counts stats and figures.
* NOTE: This call is synchronous and pauses the calling thread.
* @param jobId Job to update
* @param counts The counts
*/
public void persistDataCounts(String jobId, DataCounts counts) {
try {
resultsPersisterService.indexWithRetry(jobId,
AnomalyDetectorsIndex.resultsWriteAlias(jobId),
counts,
ToXContent.EMPTY_PARAMS,
WriteRequest.RefreshPolicy.NONE,
DataCounts.documentId(jobId),
() -> true,
(msg) -> auditor.warning(jobId, "Job data_counts " + msg));
} catch (IOException ioe) {
logger.error(() -> new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId), ioe);
} catch (Exception ex) {
logger.error(() -> new ParameterizedMessage("[{}] Failed persisting data_counts stats", jobId), ex);
}
}

/**
* The same as {@link JobDataCountsPersister#persistDataCounts(String, DataCounts)} but done Asynchronously.
*
* Two differences are:
* - The listener is notified on persistence failure
* - If the persistence fails, it is not automatically retried
* @param jobId Job to update
* @param counts The counts
* @param listener Action response listener
*/
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
public void persistDataCountsAsync(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
try (XContentBuilder content = serialiseCounts(counts)) {
final IndexRequest request = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId))
.id(DataCounts.documentId(jobId))
.source(content);
.id(DataCounts.documentId(jobId))
.source(content);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
Expand All @@ -68,7 +101,9 @@ public void onFailure(Exception e) {
}
});
} catch (IOException ioe) {
logger.warn((Supplier<?>)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe);
String msg = new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId).getFormattedMessage();
logger.error(msg, ioe);
listener.onFailure(ExceptionsHelper.serverError(msg, ioe));
}
}
}
Loading