Skip to content

[ML] Add graceful retry for anomaly detector result indexing failures #49508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
19c67d6
[ML] Add graceful retry for bulk index results failures
benwtrent Nov 22, 2019
1877d4e
fixing test, removing unused import
benwtrent Nov 25, 2019
6d59172
moving retries to individual actions in `processResult` so result pro…
benwtrent Nov 25, 2019
ed4d9a1
clear out bulk request if not able to persist after retrying
benwtrent Nov 25, 2019
ff2b984
Merge branch 'master' into feature/ml-persist-results-retry
elasticmachine Nov 25, 2019
1bf708e
removing successful bulk response items from subsequent bulk requests
benwtrent Nov 25, 2019
02c220c
Merge branch 'master' into feature/ml-persist-results-retry
benwtrent Dec 3, 2019
791c570
adding retries for all synchronous persistent calls
benwtrent Dec 3, 2019
bf749d7
Fixing refresh policy handling bug and missing settings
benwtrent Dec 4, 2019
0e01259
fixing datafeed timing stats persistence
benwtrent Dec 4, 2019
4710635
rewriting datacounts reporter to use retries
benwtrent Dec 4, 2019
7e4642f
cleanup and addressing pr comments
benwtrent Dec 9, 2019
cb8a5b2
using builder where possible
benwtrent Dec 9, 2019
7814ed2
adjusing backoff calculation
benwtrent Dec 9, 2019
fbd717a
Merge branch 'master' into feature/ml-persist-results-retry
elasticmachine Dec 9, 2019
724dfcf
disallow int wrapping
benwtrent Dec 9, 2019
cb03f38
Merge branch 'feature/ml-persist-results-retry' of github.com:benwtre…
benwtrent Dec 9, 2019
756be6c
Adding auditor messaging indicating failures and retries
benwtrent Dec 9, 2019
6e1a25a
addressing PR comments
benwtrent Dec 10, 2019
a9fba44
Merge branch 'master' into feature/ml-persist-results-retry
elasticmachine Dec 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.junit.After;
import org.junit.Before;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.hamcrest.Matchers.greaterThan;

public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase {

private String index = "bulk-failure-retry";
private long now = System.currentTimeMillis();
private String jobId = "bulk-failure-retry-job";
private String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job";

@Before
public void putPastDataIntoIndex() {
client().admin().indices().prepareCreate(index)
.addMapping("type", "time", "type=date", "value", "type=long")
.get();
long oneDayAgo = now - 86400000;
long twoDaysAgo = oneDayAgo - 86400000;
writeData(logger, index, 128, twoDaysAgo, oneDayAgo);
}

@After
public void cleanUpTest() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have a potential of affecting other, unrelated tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the @After clause. It sets all back to null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this part. My question was more along the lines if it is possible that 2 test classes will share these cluster settings. But I guess it's not the case.

.putNull("logger.org.elasticsearch.xpack.ml")
.putNull("xpack.ml.persist_results_max_retries")
.build()).get();
cleanUp();
}

private void ensureAnomaliesWrite() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
logger.error("FAILED TO MARK ["+ resultsIndex + "] as read-write again", exceptionHolder.get());
}
}

private void setAnomaliesReadOnlyBlock() throws InterruptedException {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
AtomicReference<AcknowledgedResponse> acknowledgedResponseHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener),
acknowledgedResponseHolder,
exceptionHolder);
if (exceptionHolder.get() != null) {
logger.error("FAILED TO MARK ["+ resultsIndex + "] as read-write again", exceptionHolder.get());
}
}

public void testBulkFailureRetries() throws Exception {
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
job.setResultsIndexName(jobId);

DatafeedConfig.Builder datafeedConfigBuilder =
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerJob(job);
putJob(job);
openJob(job.getId());
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now - 86400000);
waitUntilJobIsClosed(jobId);

// Get the job stats
Bucket initialLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(initialLatestBucket.getEpoch(), greaterThan(0L));

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml", "TRACE")
.put("xpack.ml.persist_results_max_retries", "10000")
.build()).get();

setAnomaliesReadOnlyBlock();

int moreDocs = 128;
long oneDayAgo = now - 86400000;
writeData(logger, index, moreDocs, oneDayAgo, now);

openJob(job.getId());
startDatafeed(datafeedConfig.getId(), oneDayAgo, now);

// TODO Any better way?????
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure of a way to ask about the internal state...I wonder if we can read the node logs to see if there is an entry indicating that the bulk index failed?

Thread.sleep(1000);
ensureAnomaliesWrite();
waitUntilJobIsClosed(jobId);

Bucket newLatestBucket = getLatestFinalizedBucket(jobId);
assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch()));
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
return createJob(id, bucketSpan, function, field, null);
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeField("time");
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);

Detector.Builder d = new Detector.Builder(function, field);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setBucketSpan(bucketSpan);
analysisConfig.setSummaryCountFieldName(summaryCountField);

Job.Builder builder = new Job.Builder();
builder.setId(id);
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
return builder;
}

private void writeData(Logger logger, String index, long numDocs, long start, long end) {
int maxDelta = (int) (end - start - 1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index);
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp, "value", i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
if (bulkResponse.hasFailures()) {
int failures = 0;
for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
}
}
fail("Bulk response contained " + failures + " failures");
}
logger.info("Indexed [{}] documents", numDocs);
}

private Bucket getLatestFinalizedBucket(String jobId) {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
getBucketsRequest.setExcludeInterim(true);
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
getBucketsRequest.setDescending(true);
getBucketsRequest.setPageParams(new PageParams(0, 1));
return getBuckets(getBucketsRequest).get(0);
}

private <T> void blockingCall(Consumer<ActionListener<T>> function,
AtomicReference<T> response,
AtomicReference<Exception> error) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ActionListener<T> listener = ActionListener.wrap(
r -> {
response.set(r);
latch.countDown();
},
e -> {
error.set(e);
latch.countDown();
}
);

function.accept(listener);
latch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.NativeAutodetectProcessFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
Expand Down Expand Up @@ -446,7 +447,8 @@ public List<Setting<?>> getSettings() {
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION,
InferenceProcessor.MAX_INFERENCE_PROCESSORS,
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
AutodetectResultProcessor.PERSIST_RESULTS_MAX_RETRIES
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private Builder(String jobId) {
* @param bucket The bucket to persist
* @return this
*/
public Builder persistBucket(Bucket bucket) {
public Builder persistBucket(Bucket bucket) throws BulkIndexException {
// If the supplied bucket has records then create a copy with records
// removed, because we never persist nested records in buckets
Bucket bucketWithoutRecords = bucket;
Expand All @@ -114,7 +114,7 @@ public Builder persistBucket(Bucket bucket) {
return this;
}

private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) {
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) throws BulkIndexException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
String id = bucketInfluencer.getId();
Expand All @@ -130,7 +130,7 @@ private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluen
* @param timingStats timing stats to persist
* @return this
*/
public Builder persistTimingStats(TimingStats timingStats) {
public Builder persistTimingStats(TimingStats timingStats) throws BulkIndexException {
indexResult(
TimingStats.documentId(timingStats.getJobId()),
timingStats,
Expand All @@ -145,7 +145,7 @@ public Builder persistTimingStats(TimingStats timingStats) {
* @param records the records to persist
* @return this
*/
public Builder persistRecords(List<AnomalyRecord> records) {
public Builder persistRecords(List<AnomalyRecord> records) throws BulkIndexException {
for (AnomalyRecord record : records) {
logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, record.getId());
indexResult(record.getId(), record, "record");
Expand All @@ -161,7 +161,7 @@ public Builder persistRecords(List<AnomalyRecord> records) {
* @param influencers the influencers to persist
* @return this
*/
public Builder persistInfluencers(List<Influencer> influencers) {
public Builder persistInfluencers(List<Influencer> influencers) throws BulkIndexException {
for (Influencer influencer : influencers) {
logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, influencer.getId());
indexResult(influencer.getId(), influencer, "influencer");
Expand All @@ -170,30 +170,30 @@ public Builder persistInfluencers(List<Influencer> influencers) {
return this;
}

public Builder persistModelPlot(ModelPlot modelPlot) {
public Builder persistModelPlot(ModelPlot modelPlot) throws BulkIndexException {
logger.trace("[{}] ES BULK ACTION: index model plot to index [{}] with ID [{}]", jobId, indexName, modelPlot.getId());
indexResult(modelPlot.getId(), modelPlot, "model plot");
return this;
}

public Builder persistForecast(Forecast forecast) {
public Builder persistForecast(Forecast forecast) throws BulkIndexException {
logger.trace("[{}] ES BULK ACTION: index forecast to index [{}] with ID [{}]", jobId, indexName, forecast.getId());
indexResult(forecast.getId(), forecast, Forecast.RESULT_TYPE_VALUE);
return this;
}

public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) {
public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) throws BulkIndexException {
logger.trace("[{}] ES BULK ACTION: index forecast request stats to index [{}] with ID [{}]", jobId, indexName,
forecastRequestStats.getId());
indexResult(forecastRequestStats.getId(), forecastRequestStats, Forecast.RESULT_TYPE_VALUE);
return this;
}

private void indexResult(String id, ToXContent resultDoc, String resultType) {
private void indexResult(String id, ToXContent resultDoc, String resultType) throws BulkIndexException {
indexResult(id, resultDoc, ToXContent.EMPTY_PARAMS, resultType);
}

private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) {
private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) throws BulkIndexException {
try (XContentBuilder content = toXContentBuilder(resultDoc, params)) {
bulkRequest.add(new IndexRequest(indexName).id(id).source(content));
} catch (IOException e) {
Expand All @@ -208,7 +208,7 @@ private void indexResult(String id, ToXContent resultDoc, ToXContent.Params para
/**
* Execute the bulk action
*/
public void executeRequest() {
public void executeRequest() throws BulkIndexException {
if (bulkRequest.numberOfActions() == 0) {
return;
}
Expand All @@ -218,6 +218,7 @@ public void executeRequest() {
BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet();
if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
throw new BulkIndexException(addRecordsResponse);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it best to throw here, and handle retries up the stack. That way the retries know about the processor state and can stop retrying if the processor died (or is dying).

}
}

Expand Down Expand Up @@ -411,4 +412,16 @@ private void logCall(String indexName) {
}
}
}

public static class BulkIndexException extends Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need both this exception class and the one defined in ResultsPersisterService?


public BulkIndexException(String msg) {
super(msg);
}

public BulkIndexException(BulkResponse bulkResponse) {
this(bulkResponse.buildFailureMessage());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,23 @@ public TimingStats getCurrentTimingStats() {
return new TimingStats(currentTimingStats);
}

public void reportBucket(Bucket bucket) {
public void reportBucket(Bucket bucket) throws JobResultsPersister.BulkIndexException {
currentTimingStats.updateStats(bucket.getProcessingTimeMs());
currentTimingStats.setLatestRecordTimestamp(bucket.getTimestamp().toInstant().plusSeconds(bucket.getBucketSpan()));
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
flush();
}
}

public void finishReporting() {
public void finishReporting() throws JobResultsPersister.BulkIndexException {
// Don't flush if current timing stats are identical to the persisted ones
if (currentTimingStats.equals(persistedTimingStats)) {
return;
}
flush();
}

private void flush() {
private void flush() throws JobResultsPersister.BulkIndexException {
persistedTimingStats = new TimingStats(currentTimingStats);
bulkResultsPersister.persistTimingStats(persistedTimingStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
Expand Down Expand Up @@ -235,7 +236,15 @@ public void finishReporting(ActionListener<Boolean> listener) {
totalRecordStats.setLastDataTimeStamp(now);
diagnostics.flush();
retrieveDiagnosticsIntermediateResults();
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), listener);
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), ActionListener.wrap(
listener::onResponse,
e -> {
// Recording data counts should not cause the job processing to fail.
// Log the failure and move on.
logger.warn(() -> new ParameterizedMessage("[{}] failed to record data counts", job.getId()), e);
listener.onResponse(true);
}
));
}

/**
Expand Down
Loading