diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java new file mode 100644 index 0000000000000..6dcf306b04a39 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java @@ -0,0 +1,214 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.junit.After; +import org.junit.Before; + +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; +import static org.hamcrest.Matchers.greaterThan; + +public class BulkFailureRetryIT extends MlNativeAutodetectIntegTestCase { + + private final String index = "bulk-failure-retry"; + private long now = System.currentTimeMillis(); + private static long DAY = Duration.ofDays(1).toMillis(); + private final String jobId = "bulk-failure-retry-job"; + private final String resultsIndex = ".ml-anomalies-custom-bulk-failure-retry-job"; + + @Before + public void putPastDataIntoIndex() { + client().admin().indices().prepareCreate(index) + .addMapping("type", "time", "type=date", "value", "type=long") + .get(); + long twoDaysAgo = now - DAY * 2; + long threeDaysAgo = now - DAY * 3; + writeData(logger, index, 250, threeDaysAgo, twoDaysAgo); + } + + @After + public void cleanUpTest() { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .putNull("xpack.ml.persist_results_max_retries") + .putNull("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob") + .putNull("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister") + .putNull("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output") + .build()).get(); + cleanUp(); + } + + private void ensureAnomaliesWrite() throws InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build(); + AtomicReference acknowledgedResponseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + blockingCall( + listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener), + acknowledgedResponseHolder, + exceptionHolder); + if (exceptionHolder.get() != null) { + fail("FAILED TO MARK ["+ resultsIndex + "] as read-write again" + exceptionHolder.get()); + } + } + + private void setAnomaliesReadOnlyBlock() throws InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build(); + AtomicReference acknowledgedResponseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + blockingCall( + listener -> client().admin().indices().prepareUpdateSettings(resultsIndex).setSettings(settings).execute(listener), + acknowledgedResponseHolder, + exceptionHolder); + if (exceptionHolder.get() != null) { + fail("FAILED TO MARK ["+ resultsIndex + "] as read-ONLY: " + exceptionHolder.get()); + } + } + + public void testBulkFailureRetries() throws Exception { + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + job.setResultsIndexName(jobId); + + DatafeedConfig.Builder datafeedConfigBuilder = + createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + registerJob(job); + putJob(job); + openJob(job.getId()); + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + long twoDaysAgo = now - 2 * DAY; + startDatafeed(datafeedConfig.getId(), 0L, twoDaysAgo); + waitUntilJobIsClosed(jobId); + + // Get the job stats + Bucket initialLatestBucket = getLatestFinalizedBucket(jobId); + assertThat(initialLatestBucket.getEpoch(), greaterThan(0L)); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob", "TRACE") + .put("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister", "TRACE") + .put("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output", "TRACE") + .put("xpack.ml.persist_results_max_retries", "15") + .build()).get(); + + setAnomaliesReadOnlyBlock(); + + int moreDocs = 1_000; + writeData(logger, index, moreDocs, twoDaysAgo, now); + + openJob(job.getId()); + startDatafeed(datafeedConfig.getId(), twoDaysAgo, now); + + ensureAnomaliesWrite(); + waitUntilJobIsClosed(jobId); + + Bucket newLatestBucket = getLatestFinalizedBucket(jobId); + assertThat(newLatestBucket.getEpoch(), greaterThan(initialLatestBucket.getEpoch())); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { + return createJob(id, bucketSpan, function, field, null); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeField("time"); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder(function, field); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())) + .setBucketSpan(bucketSpan) + .setSummaryCountFieldName(summaryCountField); + + return new Job.Builder().setId(id).setAnalysisConfig(analysisConfig).setDataDescription(dataDescription); + } + + private void writeData(Logger logger, String index, long numDocs, long start, long end) { + int maxDelta = (int) (end - start - 1); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + for (int i = 0; i < numDocs; i++) { + IndexRequest indexRequest = new IndexRequest(index); + long timestamp = start + randomIntBetween(0, maxDelta); + assert timestamp >= start && timestamp < end; + indexRequest.source("time", timestamp, "value", i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + if (bulkResponse.hasFailures()) { + int failures = 0; + for (BulkItemResponse itemResponse : bulkResponse) { + if (itemResponse.isFailed()) { + failures++; + logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); + } + } + fail("Bulk response contained " + failures + " failures"); + } + logger.info("Indexed [{}] documents", numDocs); + } + + private Bucket getLatestFinalizedBucket(String jobId) { + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + getBucketsRequest.setExcludeInterim(true); + getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName()); + getBucketsRequest.setDescending(true); + getBucketsRequest.setPageParams(new PageParams(0, 1)); + return getBuckets(getBucketsRequest).get(0); + } + + private void blockingCall(Consumer> function, + AtomicReference response, + AtomicReference error) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = ActionListener.wrap( + r -> { + response.set(r); + latch.countDown(); + }, + e -> { + error.set(e); + latch.countDown(); + } + ); + + function.accept(listener); + latch.await(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 76442812ef256..a7b21bbd721ad 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -298,6 +298,7 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import java.io.IOException; import java.math.BigInteger; @@ -446,7 +447,8 @@ public List> getSettings() { MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION, InferenceProcessor.MAX_INFERENCE_PROCESSORS, ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE, - ModelLoadingService.INFERENCE_MODEL_CACHE_TTL + ModelLoadingService.INFERENCE_MODEL_CACHE_TTL, + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES ); } @@ -520,9 +522,12 @@ public Collection createComponents(Client client, ClusterService cluster DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName()); InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName()); this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor); + ResultsPersisterService resultsPersisterService = new ResultsPersisterService(client, clusterService, settings); JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); - JobResultsPersister jobResultsPersister = new JobResultsPersister(client); - JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client); + JobResultsPersister jobResultsPersister = new JobResultsPersister(client, resultsPersisterService, anomalyDetectionAuditor); + JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client, + resultsPersisterService, + anomalyDetectionAuditor); JobConfigProvider jobConfigProvider = new JobConfigProvider(client, xContentRegistry); DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index 7b394b580044f..0406f7b9e3370 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -171,7 +171,7 @@ private ActionListener wrapRevertDataCountsL return ActionListener.wrap(response -> { jobResultsProvider.dataCounts(jobId, counts -> { counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp()); - jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener() { + jobDataCountsPersister.persistDataCountsAsync(jobId, counts, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { listener.onResponse(response); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java index 9866e3f56af19..4925d2c2ac035 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java @@ -8,16 +8,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import java.io.IOException; @@ -33,30 +36,60 @@ public class JobDataCountsPersister { private static final Logger logger = LogManager.getLogger(JobDataCountsPersister.class); + private final ResultsPersisterService resultsPersisterService; private final Client client; + private final AnomalyDetectionAuditor auditor; - public JobDataCountsPersister(Client client) { + public JobDataCountsPersister(Client client, ResultsPersisterService resultsPersisterService, AnomalyDetectionAuditor auditor) { + this.resultsPersisterService = resultsPersisterService; this.client = client; + this.auditor = auditor; } - private XContentBuilder serialiseCounts(DataCounts counts) throws IOException { + private static XContentBuilder serialiseCounts(DataCounts counts) throws IOException { XContentBuilder builder = jsonBuilder(); return counts.toXContent(builder, ToXContent.EMPTY_PARAMS); } /** * Update the job's data counts stats and figures. + * NOTE: This call is synchronous and pauses the calling thread. + * @param jobId Job to update + * @param counts The counts + */ + public void persistDataCounts(String jobId, DataCounts counts) { + try { + resultsPersisterService.indexWithRetry(jobId, + AnomalyDetectorsIndex.resultsWriteAlias(jobId), + counts, + ToXContent.EMPTY_PARAMS, + WriteRequest.RefreshPolicy.NONE, + DataCounts.documentId(jobId), + () -> true, + (msg) -> auditor.warning(jobId, "Job data_counts " + msg)); + } catch (IOException ioe) { + logger.error(() -> new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId), ioe); + } catch (Exception ex) { + logger.error(() -> new ParameterizedMessage("[{}] Failed persisting data_counts stats", jobId), ex); + } + } + + /** + * The same as {@link JobDataCountsPersister#persistDataCounts(String, DataCounts)} but done Asynchronously. * + * Two differences are: + * - The listener is notified on persistence failure + * - If the persistence fails, it is not automatically retried * @param jobId Job to update * @param counts The counts * @param listener ActionType response listener */ - public void persistDataCounts(String jobId, DataCounts counts, ActionListener listener) { + public void persistDataCountsAsync(String jobId, DataCounts counts, ActionListener listener) { try (XContentBuilder content = serialiseCounts(counts)) { final IndexRequest request = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)) - .id(DataCounts.documentId(jobId)) - .source(content); - executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener() { + .id(DataCounts.documentId(jobId)) + .source(content); + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener<>() { @Override public void onResponse(IndexResponse indexResponse) { listener.onResponse(true); @@ -68,7 +101,9 @@ public void onFailure(Exception e) { } }); } catch (IOException ioe) { - logger.warn((Supplier)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe); + String msg = new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId).getFormattedMessage(); + logger.error(msg, ioe); + listener.onFailure(ExceptionsHelper.serverError(msg, ioe)); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 783706259a17b..b9a3fbaa570bb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -8,16 +8,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse.Result; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -39,11 +39,14 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.function.Supplier; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -70,24 +73,34 @@ public class JobResultsPersister { private static final Logger logger = LogManager.getLogger(JobResultsPersister.class); private final Client client; + private final ResultsPersisterService resultsPersisterService; + private final AnomalyDetectionAuditor auditor; - public JobResultsPersister(Client client) { + public JobResultsPersister(Client client, + ResultsPersisterService resultsPersisterService, + AnomalyDetectionAuditor auditor) { this.client = client; + this.resultsPersisterService = resultsPersisterService; + this.auditor = auditor; } - public Builder bulkPersisterBuilder(String jobId) { - return new Builder(jobId); + public Builder bulkPersisterBuilder(String jobId, Supplier shouldRetry) { + return new Builder(jobId, resultsPersisterService, shouldRetry); } public class Builder { private BulkRequest bulkRequest; private final String jobId; private final String indexName; + private final Supplier shouldRetry; + private final ResultsPersisterService resultsPersisterService; - private Builder(String jobId) { + private Builder(String jobId, ResultsPersisterService resultsPersisterService, Supplier shouldRetry) { this.jobId = Objects.requireNonNull(jobId); indexName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); bulkRequest = new BulkRequest(); + this.shouldRetry = shouldRetry; + this.resultsPersisterService = resultsPersisterService; } /** @@ -213,14 +226,9 @@ public void executeRequest() { return; } logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions()); - - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { - BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet(); - if (addRecordsResponse.hasFailures()) { - logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); - } - } - + resultsPersisterService.bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, (msg) -> { + auditor.warning(jobId, "Bulk indexing of results failed " + msg); + }); bulkRequest = new BulkRequest(); } @@ -235,10 +243,10 @@ BulkRequest getBulkRequest() { * * @param category The category to be persisted */ - public void persistCategoryDefinition(CategoryDefinition category) { + public void persistCategoryDefinition(CategoryDefinition category, Supplier shouldRetry) { Persistable persistable = new Persistable(category.getJobId(), category, category.getId()); - persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId())).actionGet(); + persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId()), shouldRetry); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -246,9 +254,9 @@ public void persistCategoryDefinition(CategoryDefinition category) { /** * Persist the quantiles (blocking) */ - public void persistQuantiles(Quantiles quantiles) { + public void persistQuantiles(Quantiles quantiles, Supplier shouldRetry) { Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); - persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).actionGet(); + persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), shouldRetry); } /** @@ -263,20 +271,22 @@ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy ref /** * Persist a model snapshot description */ - public IndexResponse persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) { + public BulkResponse persistModelSnapshot(ModelSnapshot modelSnapshot, + WriteRequest.RefreshPolicy refreshPolicy, + Supplier shouldRetry) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot)); persistable.setRefreshPolicy(refreshPolicy); - return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); + return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId()), shouldRetry); } /** * Persist the memory usage data (blocking) */ - public void persistModelSizeStats(ModelSizeStats modelSizeStats) { + public void persistModelSizeStats(ModelSizeStats modelSizeStats, Supplier shouldRetry) { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId()); - persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet(); + persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), shouldRetry); } /** @@ -341,7 +351,7 @@ public void commitStateWrites(String jobId) { * @param timingStats datafeed timing stats to persist * @param refreshPolicy refresh policy to apply */ - public IndexResponse persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) { + public BulkResponse persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) { String jobId = timingStats.getJobId(); logger.trace("[{}] Persisting datafeed timing stats", jobId); Persistable persistable = new Persistable( @@ -350,7 +360,7 @@ public IndexResponse persistDatafeedTimingStats(DatafeedTimingStats timingStats, new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")), DatafeedTimingStats.documentId(timingStats.getJobId())); persistable.setRefreshPolicy(refreshPolicy); - return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet(); + return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), () -> true); } private static XContentBuilder toXContentBuilder(ToXContent obj, ToXContent.Params params) throws IOException { @@ -383,10 +393,25 @@ void setRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { this.refreshPolicy = refreshPolicy; } - ActionFuture persist(String indexName) { - PlainActionFuture actionFuture = PlainActionFuture.newFuture(); - persist(indexName, actionFuture); - return actionFuture; + BulkResponse persist(String indexName, Supplier shouldRetry) { + logCall(indexName); + try { + return resultsPersisterService.indexWithRetry(jobId, + indexName, + object, + params, + refreshPolicy, + id, + shouldRetry, + (msg) -> auditor.warning(jobId, id + " " + msg)); + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Error writing [{}]", jobId, (id == null) ? "auto-generated ID" : id), e); + IndexResponse.Builder notCreatedResponse = new IndexResponse.Builder(); + notCreatedResponse.setResult(Result.NOOP); + return new BulkResponse( + new BulkItemResponse[]{new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, notCreatedResponse.build())}, + 0); + } } void persist(String indexName, ActionListener listener) { @@ -411,4 +436,5 @@ private void logCall(String indexName) { } } } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index dff118011b147..1d3a42b5fa9b6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -7,7 +7,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -229,13 +228,13 @@ public long getAnalysedFieldsPerRecord() { /** * Report the counts now regardless of whether or not we are at a reporting boundary. */ - public void finishReporting(ActionListener listener) { + public void finishReporting() { Date now = new Date(); incrementalRecordStats.setLastDataTimeStamp(now); totalRecordStats.setLastDataTimeStamp(now); diagnostics.flush(); retrieveDiagnosticsIntermediateResults(); - dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), listener); + dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats()); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index c9441e9f60c39..422f13926d441 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -109,8 +110,7 @@ public AutodetectResultProcessor(Client client, // Visible for testing AutodetectResultProcessor(Client client, AnomalyDetectionAuditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, AutodetectProcess autodetectProcess, ModelSizeStats latestModelSizeStats, - TimingStats timingStats, - FlushListener flushListener) { + TimingStats timingStats, FlushListener flushListener) { this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); this.jobId = Objects.requireNonNull(jobId); @@ -119,7 +119,7 @@ public AutodetectResultProcessor(Client client, this.process = Objects.requireNonNull(autodetectProcess); this.flushListener = Objects.requireNonNull(flushListener); this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); - this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId); + this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive); this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister); this.deleteInterimRequired = true; } @@ -177,10 +177,7 @@ private void readResults() { LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); } } catch (Exception e) { - if (processKilled) { - throw e; - } - if (process.isProcessAliveAfterWaiting() == false) { + if (isAlive() == false) { throw e; } LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); @@ -215,7 +212,6 @@ void processResult(AutodetectResult result) { // results are also interim timingStatsReporter.reportBucket(bucket); bulkResultsPersister.persistBucket(bucket).executeRequest(); - ++bucketCount; } List records = result.getRecords(); @@ -228,7 +224,7 @@ void processResult(AutodetectResult result) { } CategoryDefinition categoryDefinition = result.getCategoryDefinition(); if (categoryDefinition != null) { - persister.persistCategoryDefinition(categoryDefinition); + persister.persistCategoryDefinition(categoryDefinition, this::isAlive); } ModelPlot modelPlot = result.getModelPlot(); if (modelPlot != null) { @@ -264,7 +260,9 @@ void processResult(AutodetectResult result) { ModelSnapshot modelSnapshot = result.getModelSnapshot(); if (modelSnapshot != null) { // We need to refresh in order for the snapshot to be available when we try to update the job with it - IndexResponse indexResponse = persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); + BulkResponse bulkResponse = persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE, this::isAlive); + assert bulkResponse.getItems().length == 1; + IndexResponse indexResponse = bulkResponse.getItems()[0].getResponse(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { updateModelSnapshotOnJob(modelSnapshot); } @@ -272,7 +270,7 @@ void processResult(AutodetectResult result) { Quantiles quantiles = result.getQuantiles(); if (quantiles != null) { LOGGER.debug("[{}] Parsed Quantiles with timestamp {}", jobId, quantiles.getTimestamp()); - persister.persistQuantiles(quantiles); + persister.persistQuantiles(quantiles, this::isAlive); bulkResultsPersister.executeRequest(); if (processKilled == false && renormalizer.isEnabled()) { @@ -316,7 +314,7 @@ private void processModelSizeStats(ModelSizeStats modelSizeStats) { modelSizeStats.getTotalOverFieldCount(), modelSizeStats.getTotalPartitionFieldCount(), modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus()); - persister.persistModelSizeStats(modelSizeStats); + persister.persistModelSizeStats(modelSizeStats, this::isAlive); notifyModelMemoryStatusChange(modelSizeStats); latestModelSizeStats = modelSizeStats; } @@ -435,6 +433,13 @@ boolean isDeleteInterimRequired() { return deleteInterimRequired; } + private boolean isAlive() { + if (processKilled) { + return false; + } + return process.isProcessAliveAfterWaiting(); + } + void setDeleteInterimRequired(boolean deleteInterimRequired) { this.deleteInterimRequired = deleteInterimRequired; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriter.java index 887b43d0b5927..efa5f767720c4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriter.java @@ -7,7 +7,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -128,11 +127,8 @@ public void write(InputStream inputStream, CategorizationAnalyzer categorization transformTimeAndWrite(record, inputFieldCount); } - // This function can throw - dataCountsReporter.finishReporting(ActionListener.wrap( - response -> handler.accept(dataCountsReporter.incrementalStats(), null), - e -> handler.accept(null, e) - )); + dataCountsReporter.finishReporting(); + handler.accept(dataCountsReporter.incrementalStats(), null); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java index 92fe2c3b0b50a..59c138cba3758 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java @@ -7,7 +7,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; @@ -53,7 +52,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { * the OutputStream. No transformation is applied to the data the timestamp * is expected in seconds from the epoch. If any of the fields in * analysisFields or the DataDescriptions - * timeField is missing from the JOSN inputIndex an exception is thrown + * timeField is missing from the JSON inputIndex an exception is thrown */ @Override public void write(InputStream inputStream, CategorizationAnalyzer categorizationAnalyzer, XContentType xContentType, @@ -70,12 +69,8 @@ public void write(InputStream inputStream, CategorizationAnalyzer categorization + "] is not supported by JsonDataToProcessWriter"); } - // this line can throw and will be propagated - dataCountsReporter.finishReporting( - ActionListener.wrap( - response -> handler.accept(dataCountsReporter.incrementalStats(), null), - e -> handler.accept(null, e) - )); + dataCountsReporter.finishReporting(); + handler.accept(dataCountsReporter.incrementalStats(), null); } private void writeJsonXContent(CategorizationAnalyzer categorizationAnalyzer, InputStream inputStream) throws IOException { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java new file mode 100644 index 0000000000000..6aad7d6a4f9b8 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils.persistence; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Random; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + +public class ResultsPersisterService { + private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class); + + public static final Setting PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting( + "xpack.ml.persist_results_max_retries", + 20, + 0, + 50, + Setting.Property.Dynamic, + Setting.Property.NodeScope); + private static final int MAX_RETRY_SLEEP_MILLIS = (int)Duration.ofMinutes(15).toMillis(); + private static final int MIN_RETRY_SLEEP_MILLIS = 50; + // Having an exponent higher than this causes integer overflow + private static final int MAX_RETRY_EXPONENT = 24; + + private final Random random = Randomness.get(); + private final Client client; + private volatile int maxFailureRetries; + + public ResultsPersisterService(Client client, ClusterService clusterService, Settings settings) { + this.client = client; + this.maxFailureRetries = PERSIST_RESULTS_MAX_RETRIES.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(PERSIST_RESULTS_MAX_RETRIES, this::setMaxFailureRetries); + } + + void setMaxFailureRetries(int value) { + this.maxFailureRetries = value; + } + + public BulkResponse indexWithRetry(String jobId, + String indexName, + ToXContent object, + ToXContent.Params params, + WriteRequest.RefreshPolicy refreshPolicy, + String id, + Supplier shouldRetry, + Consumer msgHandler) throws IOException { + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy); + try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params)) { + bulkRequest.add(new IndexRequest(indexName).id(id).source(content)); + } + return bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, msgHandler); + } + + public BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest, + String jobId, + Supplier shouldRetry, + Consumer msgHandler) { + int currentMin = MIN_RETRY_SLEEP_MILLIS; + int currentMax = MIN_RETRY_SLEEP_MILLIS; + int currentAttempt = 0; + BulkResponse bulkResponse = null; + while(currentAttempt <= maxFailureRetries) { + bulkResponse = bulkIndex(bulkRequest); + if (bulkResponse.hasFailures() == false) { + return bulkResponse; + } + if (shouldRetry.get() == false) { + throw new ElasticsearchException("[{}] failed to index all results. {}", jobId, bulkResponse.buildFailureMessage()); + } + if (currentAttempt > maxFailureRetries) { + LOGGER.warn("[{}] failed to index after [{}] attempts. Setting [xpack.ml.persist_results_max_retries] was reduced", + jobId, + currentAttempt); + throw new ElasticsearchException("[{}] failed to index all results after [{}] attempts. {}", + jobId, + currentAttempt, + bulkResponse.buildFailureMessage()); + } + currentAttempt++; + // Since we exponentially increase, we don't want force randomness to have an excessively long sleep + if (currentMax < MAX_RETRY_SLEEP_MILLIS) { + currentMin = currentMax; + } + // Exponential backoff calculation taken from: https://en.wikipedia.org/wiki/Exponential_backoff + int uncappedBackoff = ((1 << Math.min(currentAttempt, MAX_RETRY_EXPONENT)) - 1) * (50); + currentMax = Math.min(uncappedBackoff, MAX_RETRY_SLEEP_MILLIS); + // Its good to have a random window along the exponentially increasing curve + // so that not all bulk requests rest for the same amount of time + int randBound = 1 + (currentMax - currentMin); + int randSleep = currentMin + random.nextInt(randBound); + { + String msg = new ParameterizedMessage( + "failed to index after [{}] attempts. Will attempt again in [{}].", + currentAttempt, + TimeValue.timeValueMillis(randSleep).getStringRep()) + .getFormattedMessage(); + LOGGER.warn(()-> new ParameterizedMessage("[{}] {}", jobId, msg)); + msgHandler.accept(msg); + } + // We should only retry the docs that failed. + bulkRequest = buildNewRequestFromFailures(bulkRequest, bulkResponse); + try { + Thread.sleep(randSleep); + } catch (InterruptedException interruptedException) { + LOGGER.warn( + new ParameterizedMessage("[{}] failed to index after [{}] attempts due to interruption", + jobId, + currentAttempt), + interruptedException); + Thread.currentThread().interrupt(); + } + } + String bulkFailureMessage = bulkResponse == null ? "" : bulkResponse.buildFailureMessage(); + LOGGER.warn("[{}] failed to index after [{}] attempts.", jobId, currentAttempt); + throw new ElasticsearchException("[{}] failed to index all results after [{}] attempts. {}", + jobId, + currentAttempt, + bulkFailureMessage); + } + + private BulkResponse bulkIndex(BulkRequest bulkRequest) { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { + return client.bulk(bulkRequest).actionGet(); + } + } + + private BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest, BulkResponse bulkResponse) { + // If we failed, lets set the bulkRequest to be a collection of the failed requests + BulkRequest bulkRequestOfFailures = new BulkRequest(); + Set failedDocIds = Arrays.stream(bulkResponse.getItems()) + .filter(BulkItemResponse::isFailed) + .map(BulkItemResponse::getId) + .collect(Collectors.toSet()); + bulkRequest.requests().forEach(docWriteRequest -> { + if (failedDocIds.contains(docWriteRequest.id())) { + bulkRequestOfFailures.add(docWriteRequest); + } + }); + return bulkRequestOfFailures; + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index f6ec2fc9b89f1..a46e83d2488fe 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -6,13 +6,19 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.action.util.QueryPage; @@ -33,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; @@ -46,10 +53,12 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinitionTests; import org.elasticsearch.xpack.ml.job.results.ModelPlotTests; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import org.junit.After; import org.junit.Before; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -77,6 +86,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { private AutodetectResultProcessor resultProcessor; private Renormalizer renormalizer; private AutodetectProcess process; + private ResultsPersisterService resultsPersisterService; @Override protected Collection> getPlugins() { @@ -92,12 +102,24 @@ public void createComponents() throws Exception { renormalizer = mock(Renormalizer.class); process = mock(AutodetectProcess.class); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); + ThreadPool tp = mock(ThreadPool.class); + Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, + new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS, + MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, + ClusterService.USER_DEFINED_META_DATA, + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING))); + ClusterService clusterService = new ClusterService(settings, clusterSettings, tp); + + resultsPersisterService = new ResultsPersisterService(client(), clusterService, settings); resultProcessor = new AutodetectResultProcessor( client(), auditor, JOB_ID, renormalizer, - new JobResultsPersister(client()), + new JobResultsPersister(client(), resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")), process, new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index a026d5d6c337b..1a7f60cd81d54 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -5,22 +5,34 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import org.junit.Before; +import java.util.Arrays; import java.util.Date; +import java.util.HashSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Mockito.mock; public class EstablishedMemUsageIT extends BaseMlIntegTestCase { @@ -32,8 +44,21 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { @Before public void createComponents() { Settings settings = nodeSettings(0); + ThreadPool tp = mock(ThreadPool.class); + ClusterSettings clusterSettings = new ClusterSettings(settings, + new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS, + MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, + ClusterService.USER_DEFINED_META_DATA, + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING))); + ClusterService clusterService = new ClusterService(settings, clusterSettings, tp); + + ResultsPersisterService resultsPersisterService = new ResultsPersisterService(client(), clusterService, settings); jobResultsProvider = new JobResultsProvider(client(), settings); - jobResultsPersister = new JobResultsPersister(client()); + jobResultsPersister = new JobResultsPersister(client(), + resultsPersisterService, + new AnomalyDetectionAuditor(client(), "test_node")); } public void testEstablishedMem_givenNoResults() throws Exception { @@ -222,7 +247,7 @@ private void initClusterAndJob(String jobId) { } private void createBuckets(String jobId, int count) { - JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId); + JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId, () -> true); for (int i = 1; i <= count; ++i) { Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan); builder.persistBucket(bucket); @@ -235,7 +260,7 @@ private ModelSizeStats createModelSizeStats(String jobId, int bucketNum, long mo .setTimestamp(new Date(bucketSpan * bucketNum)) .setLogTime(new Date(bucketSpan * bucketNum + randomIntBetween(1, 1000))) .setModelBytes(modelBytes).build(); - jobResultsPersister.persistModelSizeStats(modelSizeStats); + jobResultsPersister.persistModelSizeStats(modelSizeStats, () -> true); return modelSizeStats; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index bde16f7e9ac07..540ee7b5adfc3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -15,14 +15,20 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutJobAction; @@ -46,12 +52,15 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import org.junit.Before; import java.io.IOException; @@ -73,17 +82,32 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.collection.IsEmptyCollection.empty; import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.mock; public class JobResultsProviderIT extends MlSingleNodeTestCase { private JobResultsProvider jobProvider; + private ResultsPersisterService resultsPersisterService; + private AnomalyDetectionAuditor auditor; @Before public void createComponents() throws Exception { Settings.Builder builder = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); jobProvider = new JobResultsProvider(client(), builder.build()); + ThreadPool tp = mock(ThreadPool.class); + ClusterSettings clusterSettings = new ClusterSettings(builder.build(), + new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS, + MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, + ClusterService.USER_DEFINED_META_DATA, + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING))); + ClusterService clusterService = new ClusterService(builder.build(), clusterSettings, tp); + + resultsPersisterService = new ResultsPersisterService(client(), clusterService, builder.build()); + auditor = new AnomalyDetectionAuditor(client(), "test_node"); waitForMlTemplates(); } @@ -574,29 +598,9 @@ private void indexScheduledEvents(List events) throws IOExceptio } } - private void indexDataCounts(DataCounts counts, String jobId) throws Exception { - JobDataCountsPersister persister = new JobDataCountsPersister(client()); - - AtomicReference errorHolder = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - persister.persistDataCounts(jobId, counts, new ActionListener() { - @Override - public void onResponse(Boolean aBoolean) { - assertTrue(aBoolean); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - errorHolder.set(e); - latch.countDown(); - } - }); - - latch.await(); - if (errorHolder.get() != null) { - throw errorHolder.get(); - } + private void indexDataCounts(DataCounts counts, String jobId) { + JobDataCountsPersister persister = new JobDataCountsPersister(client(), resultsPersisterService, auditor); + persister.persistDataCounts(jobId, counts); } private void indexFilters(List filters) throws IOException { @@ -616,18 +620,18 @@ private void indexFilters(List filters) throws IOException { } private void indexModelSizeStats(ModelSizeStats modelSizeStats) { - JobResultsPersister persister = new JobResultsPersister(client()); - persister.persistModelSizeStats(modelSizeStats); + JobResultsPersister persister = new JobResultsPersister(client(), resultsPersisterService, auditor); + persister.persistModelSizeStats(modelSizeStats, () -> true); } private void indexModelSnapshot(ModelSnapshot snapshot) { - JobResultsPersister persister = new JobResultsPersister(client()); - persister.persistModelSnapshot(snapshot, WriteRequest.RefreshPolicy.IMMEDIATE); + JobResultsPersister persister = new JobResultsPersister(client(), resultsPersisterService, auditor); + persister.persistModelSnapshot(snapshot, WriteRequest.RefreshPolicy.IMMEDIATE, () -> true); } private void indexQuantiles(Quantiles quantiles) { - JobResultsPersister persister = new JobResultsPersister(client()); - persister.persistQuantiles(quantiles); + JobResultsPersister persister = new JobResultsPersister(client(), resultsPersisterService, auditor); + persister.persistQuantiles(quantiles, () -> true); } private void indexCalendars(List calendars) throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 09e5cf9ce6331..7bf719878c157 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -6,17 +6,19 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; @@ -27,19 +29,23 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; +import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import org.mockito.ArgumentCaptor; -import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -51,7 +57,7 @@ public class JobResultsPersisterTests extends ESTestCase { private static final String JOB_ID = "foo"; - public void testPersistBucket_OneRecord() throws IOException { + public void testPersistBucket_OneRecord() { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); Bucket bucket = new Bucket("foo", new Date(), 123456); @@ -72,8 +78,8 @@ public void testPersistBucket_OneRecord() throws IOException { AnomalyRecord record = new AnomalyRecord(JOB_ID, new Date(), 600); bucket.setRecords(Collections.singletonList(record)); - JobResultsPersister persister = new JobResultsPersister(client); - persister.bulkPersisterBuilder(JOB_ID).persistBucket(bucket).executeRequest(); + JobResultsPersister persister = new JobResultsPersister(client, buildResultsPersisterService(client), makeAuditor()); + persister.bulkPersisterBuilder(JOB_ID, () -> true).persistBucket(bucket).executeRequest(); BulkRequest bulkRequest = captor.getValue(); assertEquals(2, bulkRequest.numberOfActions()); @@ -94,7 +100,7 @@ public void testPersistBucket_OneRecord() throws IOException { assertTrue(s.matches(".*raw_anomaly_score.:19\\.19.*")); } - public void testPersistRecords() throws IOException { + public void testPersistRecords() { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); @@ -124,8 +130,8 @@ public void testPersistRecords() throws IOException { typicals.add(998765.3); r1.setTypical(typicals); - JobResultsPersister persister = new JobResultsPersister(client); - persister.bulkPersisterBuilder(JOB_ID).persistRecords(records).executeRequest(); + JobResultsPersister persister = new JobResultsPersister(client, buildResultsPersisterService(client), makeAuditor()); + persister.bulkPersisterBuilder(JOB_ID, () -> true).persistRecords(records).executeRequest(); BulkRequest bulkRequest = captor.getValue(); assertEquals(1, bulkRequest.numberOfActions()); @@ -149,7 +155,7 @@ public void testPersistRecords() throws IOException { assertTrue(s.matches(".*over_field_value.:.overValue.*")); } - public void testPersistInfluencers() throws IOException { + public void testPersistInfluencers() { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); @@ -160,8 +166,8 @@ public void testPersistInfluencers() throws IOException { inf.setProbability(0.4); influencers.add(inf); - JobResultsPersister persister = new JobResultsPersister(client); - persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers).executeRequest(); + JobResultsPersister persister = new JobResultsPersister(client, buildResultsPersisterService(client), makeAuditor()); + persister.bulkPersisterBuilder(JOB_ID, () -> true).persistInfluencers(influencers).executeRequest(); BulkRequest bulkRequest = captor.getValue(); assertEquals(1, bulkRequest.numberOfActions()); @@ -176,7 +182,7 @@ public void testPersistInfluencers() throws IOException { public void testExecuteRequest_ClearsBulkRequest() { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); - JobResultsPersister persister = new JobResultsPersister(client); + JobResultsPersister persister = new JobResultsPersister(client, buildResultsPersisterService(client), makeAuditor()); List influencers = new ArrayList<>(); Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1", new Date(), 600); @@ -185,7 +191,7 @@ public void testExecuteRequest_ClearsBulkRequest() { inf.setProbability(0.4); influencers.add(inf); - JobResultsPersister.Builder builder = persister.bulkPersisterBuilder(JOB_ID); + JobResultsPersister.Builder builder = persister.bulkPersisterBuilder(JOB_ID, () -> true); builder.persistInfluencers(influencers).executeRequest(); assertEquals(0, builder.getBulkRequest().numberOfActions()); } @@ -193,9 +199,9 @@ public void testExecuteRequest_ClearsBulkRequest() { public void testBulkRequestExecutesWhenReachMaxDocs() { ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(captor); - JobResultsPersister persister = new JobResultsPersister(client); + JobResultsPersister persister = new JobResultsPersister(client, buildResultsPersisterService(client), makeAuditor()); - JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo"); + JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo", () -> true); ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456, 0); for (int i=0; i<=JobRenormalizedResultsPersister.BULK_LIMIT; i++) { bulkBuilder.persistModelPlot(modelPlot); @@ -210,11 +216,11 @@ public void testPersistTimingStats() { ArgumentCaptor bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class); Client client = mockClient(bulkRequestCaptor); - JobResultsPersister persister = new JobResultsPersister(client); + JobResultsPersister persister = new JobResultsPersister(client, buildResultsPersisterService(client), makeAuditor()); TimingStats timingStats = new TimingStats( "foo", 7, 1.0, 2.0, 1.23, 7.89, new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(123456789), 60.0)); - persister.bulkPersisterBuilder(JOB_ID).persistTimingStats(timingStats).executeRequest(); + persister.bulkPersisterBuilder(JOB_ID, () -> true).persistTimingStats(timingStats).executeRequest(); verify(client, times(1)).bulk(bulkRequestCaptor.capture()); BulkRequest bulkRequest = bulkRequestCaptor.getValue(); @@ -245,28 +251,20 @@ public void testPersistTimingStats() { @SuppressWarnings({"unchecked", "rawtypes"}) public void testPersistDatafeedTimingStats() { Client client = mockClient(ArgumentCaptor.forClass(BulkRequest.class)); - doAnswer( - invocationOnMock -> { - // Take the listener passed to client::index as 2nd argument - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; - // Handle the response on the listener - listener.onResponse(new IndexResponse(new ShardId("test", "test", 0), "test", 0, 0, 0, false)); - return null; - }) - .when(client).index(any(), any(ActionListener.class)); - - JobResultsPersister persister = new JobResultsPersister(client); + JobResultsPersister persister = new JobResultsPersister(client, buildResultsPersisterService(client), makeAuditor()); DatafeedTimingStats timingStats = new DatafeedTimingStats( "foo", 6, 66, 666.0, new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(123456789), 60.0)); persister.persistDatafeedTimingStats(timingStats, WriteRequest.RefreshPolicy.IMMEDIATE); - ArgumentCaptor indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); - verify(client, times(1)).index(indexRequestCaptor.capture(), any(ActionListener.class)); - IndexRequest indexRequest = indexRequestCaptor.getValue(); + ArgumentCaptor indexRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class); + verify(client, times(1)).bulk(indexRequestCaptor.capture()); + + // Refresh policy is set on the bulk request, not the individual index requests + assertThat(indexRequestCaptor.getValue().getRefreshPolicy(), equalTo(WriteRequest.RefreshPolicy.IMMEDIATE)); + IndexRequest indexRequest = (IndexRequest)indexRequestCaptor.getValue().requests().get(0); assertThat(indexRequest.index(), equalTo(".ml-anomalies-.write-foo")); assertThat(indexRequest.id(), equalTo("foo_datafeed_timing_stats")); - assertThat(indexRequest.getRefreshPolicy(), equalTo(WriteRequest.RefreshPolicy.IMMEDIATE)); assertThat( indexRequest.sourceAsMap(), equalTo( @@ -285,15 +283,51 @@ public void testPersistDatafeedTimingStats() { verifyNoMoreInteractions(client); } - @SuppressWarnings({"unchecked"}) private Client mockClient(ArgumentCaptor captor) { + return mockClientWithResponse(captor, new BulkResponse(new BulkItemResponse[0], 0L)); + } + + @SuppressWarnings("unchecked") + private Client mockClientWithResponse(ArgumentCaptor captor, BulkResponse... responses) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - ActionFuture future = mock(ActionFuture.class); - when(future.actionGet()).thenReturn(new BulkResponse(new BulkItemResponse[0], 0L)); - when(client.bulk(captor.capture())).thenReturn(future); + List> futures = new ArrayList<>(responses.length - 1); + ActionFuture future1 = makeFuture(responses[0]); + for (int i = 1; i < responses.length; i++) { + futures.add(makeFuture(responses[i])); + } + when(client.bulk(captor.capture())).thenReturn(future1, futures.toArray(ActionFuture[]::new)); return client; } + + @SuppressWarnings("unchecked") + private static ActionFuture makeFuture(BulkResponse response) { + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + return future; + } + + private ResultsPersisterService buildResultsPersisterService(Client client) { + ThreadPool tp = mock(ThreadPool.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, + new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS, + MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, + ClusterService.USER_DEFINED_META_DATA, + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING))); + ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp); + + return new ResultsPersisterService(client, clusterService, Settings.EMPTY); + } + + private AnomalyDetectionAuditor makeAuditor() { + AnomalyDetectionAuditor anomalyDetectionAuditor = mock(AnomalyDetectionAuditor.class); + doNothing().when(anomalyDetectionAuditor).warning(any(), any()); + doNothing().when(anomalyDetectionAuditor).info(any(), any()); + doNothing().when(anomalyDetectionAuditor).error(any(), any()); + return anomalyDetectionAuditor; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index 5415b46019196..e9faff382edb3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.job.process; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -17,7 +16,6 @@ import org.junit.Before; import org.mockito.Mockito; -import java.io.IOException; import java.util.Arrays; import java.util.Date; import java.util.concurrent.TimeUnit; @@ -26,6 +24,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; public class DataCountsReporterTests extends ESTestCase { @@ -50,14 +49,14 @@ public void setUpMocks() { jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class); } - public void testSimpleConstructor() throws Exception { + public void testSimpleConstructor() { DataCountsReporter dataCountsReporter = new DataCountsReporter(job, new DataCounts(job.getId()), jobDataCountsPersister); DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); } - public void testComplexConstructor() throws Exception { + public void testComplexConstructor() { DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, 6L, 7L, 8L, new Date(), new Date(), new Date(), new Date(), new Date()); @@ -77,7 +76,7 @@ public void testComplexConstructor() throws Exception { assertNull(stats.getEarliestRecordTimeStamp()); } - public void testResetIncrementalCounts() throws Exception { + public void testResetIncrementalCounts() { DataCountsReporter dataCountsReporter = new DataCountsReporter(job, new DataCounts(job.getId()), jobDataCountsPersister); DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); @@ -119,7 +118,7 @@ public void testResetIncrementalCounts() throws Exception { assertEquals(602000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime()); // send 'flush' signal - dataCountsReporter.finishReporting(ActionListener.wrap(r -> {}, e -> {})); + dataCountsReporter.finishReporting(); assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount()); assertEquals(1, dataCountsReporter.runningTotalStats().getEmptyBucketCount()); assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount()); @@ -129,7 +128,7 @@ public void testResetIncrementalCounts() throws Exception { assertEquals(0, dataCountsReporter.incrementalStats().getSparseBucketCount()); } - public void testReportLatestTimeIncrementalStats() throws IOException { + public void testReportLatestTimeIncrementalStats() { DataCountsReporter dataCountsReporter = new DataCountsReporter(job, new DataCounts(job.getId()), jobDataCountsPersister); dataCountsReporter.startNewIncrementalCount(); dataCountsReporter.reportLatestTimeIncrementalStats(5001L); @@ -157,7 +156,7 @@ public void testReportRecordsWritten() { assertEquals(dataCountsReporter.incrementalStats(), dataCountsReporter.runningTotalStats()); - verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any()); + verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class)); } public void testReportRecordsWritten_Given9999Records() { @@ -256,7 +255,7 @@ public void testFinishReporting() { dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 3000); dataCountsReporter.reportMissingField(); - dataCountsReporter.finishReporting(ActionListener.wrap(r -> {}, e -> {})); + dataCountsReporter.finishReporting(); long lastReportedTimeMs = dataCountsReporter.incrementalStats().getLastDataTimeStamp().getTime(); // check last data time is equal to now give or take a second @@ -266,11 +265,11 @@ public void testFinishReporting() { dataCountsReporter.runningTotalStats().getLastDataTimeStamp()); dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp()); - Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), eq(dc), any()); + verify(jobDataCountsPersister, times(1)).persistDataCounts(eq("sr"), eq(dc)); assertEquals(dc, dataCountsReporter.incrementalStats()); } - private void assertAllCountFieldsEqualZero(DataCounts stats) throws Exception { + private void assertAllCountFieldsEqualZero(DataCounts stats) { assertEquals(0L, stats.getProcessedRecordCount()); assertEquals(0L, stats.getProcessedFieldCount()); assertEquals(0L, stats.getInputBytes()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 4562779fc292f..e9fbaf4bbb0d5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; @@ -55,6 +54,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -235,10 +235,7 @@ private AutodetectCommunicator createAutodetectCommunicator(ExecutorService exec AutodetectResultProcessor autodetectResultProcessor, BiConsumer finishHandler) throws IOException { DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); - doAnswer(invocation -> { - ((ActionListener) invocation.getArguments()[0]).onResponse(true); - return null; - }).when(dataCountsReporter).finishReporting(any()); + doNothing().when(dataCountsReporter).finishReporting(); return new AutodetectCommunicator(createJobDetails(), environment, autodetectProcess, stateStreamer, dataCountsReporter, autodetectResultProcessor, finishHandler, new NamedXContentRegistry(Collections.emptyList()), executorService); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 72bdf45a96c4a..09a92e57d0432 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.process.NativeStorageProvider; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -61,6 +62,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -143,7 +145,7 @@ public void setup() throws Exception { jobManager = mock(JobManager.class); jobResultsProvider = mock(JobResultsProvider.class); jobResultsPersister = mock(JobResultsPersister.class); - when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class)); + when(jobResultsPersister.bulkPersisterBuilder(any(), any())).thenReturn(mock(JobResultsPersister.Builder.class)); jobDataCountsPersister = mock(JobDataCountsPersister.class); autodetectCommunicator = mock(AutodetectCommunicator.class); autodetectFactory = mock(AutodetectProcessFactory.class); @@ -151,7 +153,9 @@ public void setup() throws Exception { auditor = mock(AnomalyDetectionAuditor.class); clusterService = mock(ClusterService.class); ClusterSettings clusterSettings = - new ClusterSettings(Settings.EMPTY, Collections.singleton(MachineLearning.MAX_OPEN_JOBS_PER_NODE)); + new ClusterSettings(Settings.EMPTY, + new HashSet<>(Arrays.asList(MachineLearning.MAX_OPEN_JOBS_PER_NODE, + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES))); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); MetaData metaData = mock(MetaData.class); SortedMap aliasOrIndexSortedMap = new TreeMap<>(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index 63ca73444b540..2f31015b575c7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -8,6 +8,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -95,7 +98,7 @@ public void setUpMocks() { renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); bulkBuilder = mock(JobResultsPersister.Builder.class); - when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + when(persister.bulkPersisterBuilder(eq(JOB_ID), any())).thenReturn(bulkBuilder); process = mock(AutodetectProcess.class); flushListener = mock(FlushListener.class); processorUnderTest = new AutodetectResultProcessor( @@ -125,12 +128,12 @@ public void testProcess() throws TimeoutException { assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); verify(renormalizer).waitUntilIdle(); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister).commitResultWrites(JOB_ID); verify(persister).commitStateWrites(JOB_ID); } - public void testProcessResult_bucket() { + public void testProcessResult_bucket() throws Exception { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); @@ -143,11 +146,11 @@ public void testProcessResult_bucket() { verify(bulkBuilder).persistTimingStats(any(TimingStats.class)); verify(bulkBuilder).persistBucket(bucket); verify(bulkBuilder).executeRequest(); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister, never()).deleteInterimResults(JOB_ID); } - public void testProcessResult_bucket_deleteInterimRequired() { + public void testProcessResult_bucket_deleteInterimRequired() throws Exception { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); @@ -160,11 +163,11 @@ public void testProcessResult_bucket_deleteInterimRequired() { verify(bulkBuilder).persistTimingStats(any(TimingStats.class)); verify(bulkBuilder).persistBucket(bucket); verify(bulkBuilder).executeRequest(); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister).deleteInterimResults(JOB_ID); } - public void testProcessResult_records() { + public void testProcessResult_records() throws Exception { AutodetectResult result = mock(AutodetectResult.class); List records = Arrays.asList( @@ -177,10 +180,10 @@ public void testProcessResult_records() { verify(bulkBuilder).persistRecords(records); verify(bulkBuilder, never()).executeRequest(); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); } - public void testProcessResult_influencers() { + public void testProcessResult_influencers() throws Exception { AutodetectResult result = mock(AutodetectResult.class); List influencers = Arrays.asList( @@ -193,10 +196,10 @@ public void testProcessResult_influencers() { verify(bulkBuilder).persistInfluencers(influencers); verify(bulkBuilder, never()).executeRequest(); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); } - public void testProcessResult_categoryDefinition() { + public void testProcessResult_categoryDefinition() throws Exception { AutodetectResult result = mock(AutodetectResult.class); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); when(result.getCategoryDefinition()).thenReturn(categoryDefinition); @@ -205,11 +208,11 @@ public void testProcessResult_categoryDefinition() { processorUnderTest.processResult(result); verify(bulkBuilder, never()).executeRequest(); - verify(persister).persistCategoryDefinition(categoryDefinition); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).persistCategoryDefinition(eq(categoryDefinition), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); } - public void testProcessResult_flushAcknowledgement() { + public void testProcessResult_flushAcknowledgement() throws Exception { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(JOB_ID); @@ -219,13 +222,13 @@ public void testProcessResult_flushAcknowledgement() { processorUnderTest.processResult(result); assertTrue(processorUnderTest.isDeleteInterimRequired()); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); verify(persister).commitResultWrites(JOB_ID); verify(bulkBuilder).executeRequest(); } - public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { + public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws Exception { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(JOB_ID); @@ -238,14 +241,14 @@ public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { assertTrue(processorUnderTest.isDeleteInterimRequired()); InOrder inOrder = inOrder(persister, bulkBuilder, flushListener); - inOrder.verify(persister).bulkPersisterBuilder(JOB_ID); - inOrder.verify(persister).persistCategoryDefinition(categoryDefinition); + inOrder.verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any()); inOrder.verify(bulkBuilder).executeRequest(); inOrder.verify(persister).commitResultWrites(JOB_ID); inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); } - public void testProcessResult_modelPlot() { + public void testProcessResult_modelPlot() throws Exception { AutodetectResult result = mock(AutodetectResult.class); ModelPlot modelPlot = mock(ModelPlot.class); when(result.getModelPlot()).thenReturn(modelPlot); @@ -253,11 +256,11 @@ public void testProcessResult_modelPlot() { processorUnderTest.setDeleteInterimRequired(false); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(bulkBuilder).persistModelPlot(modelPlot); } - public void testProcessResult_modelSizeStats() { + public void testProcessResult_modelSizeStats() throws Exception { AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); when(result.getModelSizeStats()).thenReturn(modelSizeStats); @@ -266,11 +269,11 @@ public void testProcessResult_modelSizeStats() { processorUnderTest.processResult(result); assertThat(processorUnderTest.modelSizeStats(), is(equalTo(modelSizeStats))); - verify(persister).bulkPersisterBuilder(JOB_ID); - verify(persister).persistModelSizeStats(modelSizeStats); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).persistModelSizeStats(eq(modelSizeStats), any()); } - public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { + public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exception { TimeValue delay = TimeValue.timeValueSeconds(5); // Set up schedule delay time when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString())) @@ -303,29 +306,30 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { when(result.getModelSizeStats()).thenReturn(modelSizeStats); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(JOB_ID); - verify(persister, times(4)).persistModelSizeStats(any(ModelSizeStats.class)); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister, times(4)).persistModelSizeStats(any(ModelSizeStats.class), any()); // We should have only fired two notifications: one for soft_limit and one for hard_limit verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT)); verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb")); } - public void testProcessResult_modelSnapshot() { + public void testProcessResult_modelSnapshot() throws Exception { AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) .setSnapshotId("a_snapshot_id") .setMinVersion(Version.CURRENT) .build(); when(result.getModelSnapshot()).thenReturn(modelSnapshot); + IndexResponse indexResponse = new IndexResponse(new ShardId("ml", "uid", 0), "1", 0L, 0L, 0L, true); - when(persister.persistModelSnapshot(any(), any())) - .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "1", 0L, 0L, 0L, true)); + when(persister.persistModelSnapshot(any(), any(), any())) + .thenReturn(new BulkResponse(new BulkItemResponse[]{new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, indexResponse)}, 0)); processorUnderTest.setDeleteInterimRequired(false); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(JOB_ID); - verify(persister).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).persistModelSnapshot(eq(modelSnapshot), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any()); UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID, new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build()); @@ -333,7 +337,7 @@ public void testProcessResult_modelSnapshot() { verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); } - public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { + public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws Exception { AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); when(result.getQuantiles()).thenReturn(quantiles); @@ -342,15 +346,15 @@ public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { processorUnderTest.setDeleteInterimRequired(false); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(JOB_ID); - verify(persister).persistQuantiles(quantiles); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).persistQuantiles(eq(quantiles), any()); verify(bulkBuilder).executeRequest(); verify(persister).commitResultWrites(JOB_ID); verify(renormalizer).isEnabled(); verify(renormalizer).renormalize(quantiles); } - public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { + public void testProcessResult_quantiles_givenRenormalizationIsDisabled() throws Exception { AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); when(result.getQuantiles()).thenReturn(quantiles); @@ -359,8 +363,8 @@ public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { processorUnderTest.setDeleteInterimRequired(false); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(JOB_ID); - verify(persister).persistQuantiles(quantiles); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).persistQuantiles(eq(quantiles), any()); verify(bulkBuilder).executeRequest(); verify(renormalizer).isEnabled(); } @@ -374,7 +378,7 @@ public void testAwaitCompletion() throws TimeoutException { assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1))); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister).commitResultWrites(JOB_ID); verify(persister).commitStateWrites(JOB_ID); verify(renormalizer).waitUntilIdle(); @@ -389,12 +393,12 @@ public void testPersisterThrowingDoesntBlockProcessing() { when(process.isProcessAliveAfterWaiting()).thenReturn(true); when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult, autodetectResult).iterator()); - doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any(), any()); + doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any(), any(), any()); processorUnderTest.process(); - verify(persister).bulkPersisterBuilder(JOB_ID); - verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE)); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any()); } public void testParsingErrorSetsFailed() throws Exception { @@ -412,7 +416,7 @@ public void testParsingErrorSetsFailed() throws Exception { processorUnderTest.waitForFlushAcknowledgement(JOB_ID, Duration.of(300, ChronoUnit.SECONDS)); assertThat(flushAcknowledgement, is(nullValue())); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); } public void testKill() throws TimeoutException { @@ -425,7 +429,7 @@ public void testKill() throws TimeoutException { assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1))); - verify(persister).bulkPersisterBuilder(JOB_ID); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister).commitResultWrites(JOB_ID); verify(persister).commitStateWrites(JOB_ID); verify(renormalizer, never()).renormalize(any()); @@ -433,4 +437,5 @@ public void testKill() throws TimeoutException { verify(renormalizer).waitUntilIdle(); verify(flushListener).clear(); } + } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java index cf65eec4f04df..05604d0f876c2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; @@ -42,6 +41,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -106,7 +106,7 @@ public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws IOException expectedRecords.add(new String[] { "2", "2.0", "" }); assertWrittenRecordsEqualTo(expectedRecords); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenTimeFormatIsEpochAndCategorization() throws IOException { @@ -143,7 +143,7 @@ public void testWrite_GivenTimeFormatIsEpochAndCategorization() throws IOExcepti } assertWrittenRecordsEqualTo(expectedRecords); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenTimeFormatIsEpochAndTimestampsAreOutOfOrder() throws IOException { @@ -166,7 +166,7 @@ public void testWrite_GivenTimeFormatIsEpochAndTimestampsAreOutOfOrder() throws verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2); verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong()); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenTimeFormatIsEpochAndAllRecordsAreOutOfOrder() throws IOException { @@ -190,7 +190,7 @@ public void testWrite_GivenTimeFormatIsEpochAndAllRecordsAreOutOfOrder() throws verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2); verify(dataCountsReporter, times(2)).reportLatestTimeIncrementalStats(anyLong()); verify(dataCountsReporter, never()).reportRecordWritten(anyLong(), anyLong()); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws IOException { @@ -224,7 +224,7 @@ public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOu verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2); verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong()); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_NullByte() throws IOException { @@ -262,7 +262,7 @@ public void testWrite_NullByte() throws IOException { verify(dataCountsReporter, times(1)).reportRecordWritten(2, 3000); verify(dataCountsReporter, times(1)).reportRecordWritten(2, 4000); verify(dataCountsReporter, times(1)).reportDateParseError(2); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } @SuppressWarnings("unchecked") @@ -274,11 +274,7 @@ public void testWrite_EmptyInput() throws IOException { when(dataCountsReporter.incrementalStats()).thenReturn(new DataCounts("foo")); - doAnswer(invocation -> { - ActionListener listener = (ActionListener) invocation.getArguments()[0]; - listener.onResponse(true); - return null; - }).when(dataCountsReporter).finishReporting(any()); + doNothing().when(dataCountsReporter).finishReporting(); InputStream inputStream = createInputStream(""); CsvDataToProcessWriter writer = createWriter(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java index f16b388edee6f..89ff28928fdae 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java @@ -106,7 +106,7 @@ public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws Exception { expectedRecords.add(new String[]{"2", "2.0", ""}); assertWrittenRecordsEqualTo(expectedRecords); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenTimeFormatIsEpochAndCategorization() throws Exception { @@ -142,7 +142,7 @@ public void testWrite_GivenTimeFormatIsEpochAndCategorization() throws Exception } assertWrittenRecordsEqualTo(expectedRecords); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenTimeFormatIsEpochAndTimestampsAreOutOfOrder() throws Exception { @@ -164,7 +164,7 @@ public void testWrite_GivenTimeFormatIsEpochAndTimestampsAreOutOfOrder() throws verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2); verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong()); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws Exception { @@ -195,7 +195,7 @@ public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOu verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2); verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong()); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenMalformedJsonWithoutNestedLevels() throws Exception { @@ -223,7 +223,7 @@ public void testWrite_GivenMalformedJsonWithoutNestedLevels() throws Exception { assertWrittenRecordsEqualTo(expectedRecords); verify(dataCountsReporter).reportMissingFields(1); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenMalformedJsonWithNestedLevels() @@ -251,7 +251,7 @@ public void testWrite_GivenMalformedJsonWithNestedLevels() expectedRecords.add(new String[]{"3", "3.0", ""}); assertWrittenRecordsEqualTo(expectedRecords); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenMalformedJsonThatNeverRecovers() @@ -293,7 +293,7 @@ public void testWrite_GivenJsonWithArrayField() throws Exception { expectedRecords.add(new String[]{"2", "2.0", ""}); assertWrittenRecordsEqualTo(expectedRecords); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenJsonWithMissingFields() throws Exception { @@ -330,7 +330,7 @@ public void testWrite_GivenJsonWithMissingFields() throws Exception { verify(dataCountsReporter, times(1)).reportRecordWritten(1, 3000); verify(dataCountsReporter, times(1)).reportRecordWritten(1, 4000); verify(dataCountsReporter, times(1)).reportDateParseError(0); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } public void testWrite_Smile() throws Exception { @@ -367,7 +367,7 @@ public void testWrite_Smile() throws Exception { expectedRecords.add(new String[]{"2", "2.0", ""}); assertWrittenRecordsEqualTo(expectedRecords); - verify(dataCountsReporter).finishReporting(any()); + verify(dataCountsReporter).finishReporting(); } private static InputStream createInputStream(String input) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java new file mode 100644 index 0000000000000..9d5b922ccb0c6 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java @@ -0,0 +1,203 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils.persistence; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; +import org.mockito.ArgumentCaptor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ResultsPersisterServiceTests extends ESTestCase { + + private final String JOB_ID = "results_persister_test_job"; + private final Consumer NULL_MSG_HANDLER = (msg) -> {}; + + public void testBulkRequestChangeOnFailures() { + IndexRequest indexRequestSuccess = new IndexRequest("my-index").id("success").source(Collections.singletonMap("data", "success")); + IndexRequest indexRequestFail = new IndexRequest("my-index").id("fail").source(Collections.singletonMap("data", "fail")); + BulkItemResponse successItem = new BulkItemResponse(1, + DocWriteRequest.OpType.INDEX, + new IndexResponse(new ShardId(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared", "uuid", 1), + indexRequestSuccess.id(), + 0, + 0, + 1, + true)); + BulkItemResponse failureItem = new BulkItemResponse(2, + DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("my-index", "fail", new Exception("boom"))); + BulkResponse withFailure = new BulkResponse(new BulkItemResponse[]{ failureItem, successItem }, 0L); + Client client = mockClientWithResponse(withFailure, new BulkResponse(new BulkItemResponse[0], 0L)); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(indexRequestFail); + bulkRequest.add(indexRequestSuccess); + + ResultsPersisterService resultsPersisterService = buildResultsPersisterService(client); + + resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, NULL_MSG_HANDLER); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); + verify(client, times(2)).bulk(captor.capture()); + + List requests = captor.getAllValues(); + + assertThat(requests.get(0).numberOfActions(), equalTo(2)); + assertThat(requests.get(1).numberOfActions(), equalTo(1)); + } + + public void testBulkRequestDoesNotRetryWhenSupplierIsFalse() { + IndexRequest indexRequestSuccess = new IndexRequest("my-index").id("success").source(Collections.singletonMap("data", "success")); + IndexRequest indexRequestFail = new IndexRequest("my-index").id("fail").source(Collections.singletonMap("data", "fail")); + BulkItemResponse successItem = new BulkItemResponse(1, + DocWriteRequest.OpType.INDEX, + new IndexResponse(new ShardId(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared", "uuid", 1), + indexRequestSuccess.id(), + 0, + 0, + 1, + true)); + BulkItemResponse failureItem = new BulkItemResponse(2, + DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("my-index", "fail", new Exception("boom"))); + BulkResponse withFailure = new BulkResponse(new BulkItemResponse[]{ failureItem, successItem }, 0L); + Client client = mockClientWithResponse(withFailure, new BulkResponse(new BulkItemResponse[0], 0L)); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(indexRequestFail); + bulkRequest.add(indexRequestSuccess); + + ResultsPersisterService resultsPersisterService = buildResultsPersisterService(client); + + expectThrows(ElasticsearchException.class, + () -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> false, NULL_MSG_HANDLER)); + } + + public void testBulkRequestRetriesConfiguredAttemptNumber() { + IndexRequest indexRequestFail = new IndexRequest("my-index").id("fail").source(Collections.singletonMap("data", "fail")); + BulkItemResponse failureItem = new BulkItemResponse(2, + DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("my-index", "fail", new Exception("boom"))); + BulkResponse withFailure = new BulkResponse(new BulkItemResponse[]{ failureItem }, 0L); + Client client = mockClientWithResponse(withFailure); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(indexRequestFail); + + ResultsPersisterService resultsPersisterService = buildResultsPersisterService(client); + + resultsPersisterService.setMaxFailureRetries(1); + expectThrows(ElasticsearchException.class, + () -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, NULL_MSG_HANDLER)); + verify(client, times(2)).bulk(any(BulkRequest.class)); + } + + public void testBulkRequestRetriesMsgHandlerIsCalled() { + IndexRequest indexRequestSuccess = new IndexRequest("my-index").id("success").source(Collections.singletonMap("data", "success")); + IndexRequest indexRequestFail = new IndexRequest("my-index").id("fail").source(Collections.singletonMap("data", "fail")); + BulkItemResponse successItem = new BulkItemResponse(1, + DocWriteRequest.OpType.INDEX, + new IndexResponse(new ShardId(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared", "uuid", 1), + indexRequestSuccess.id(), + 0, + 0, + 1, + true)); + BulkItemResponse failureItem = new BulkItemResponse(2, + DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("my-index", "fail", new Exception("boom"))); + BulkResponse withFailure = new BulkResponse(new BulkItemResponse[]{ failureItem, successItem }, 0L); + Client client = mockClientWithResponse(withFailure, new BulkResponse(new BulkItemResponse[0], 0L)); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(indexRequestFail); + bulkRequest.add(indexRequestSuccess); + + ResultsPersisterService resultsPersisterService = buildResultsPersisterService(client); + AtomicReference msgHolder = new AtomicReference<>("not_called"); + + resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, msgHolder::set); + + ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); + verify(client, times(2)).bulk(captor.capture()); + + List requests = captor.getAllValues(); + + assertThat(requests.get(0).numberOfActions(), equalTo(2)); + assertThat(requests.get(1).numberOfActions(), equalTo(1)); + assertThat(msgHolder.get(), containsString("failed to index after [1] attempts. Will attempt again in")); + } + + @SuppressWarnings("unchecked") + private Client mockClientWithResponse(BulkResponse... responses) { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + List> futures = new ArrayList<>(responses.length - 1); + ActionFuture future1 = makeFuture(responses[0]); + for (int i = 1; i < responses.length; i++) { + futures.add(makeFuture(responses[i])); + } + when(client.bulk(any(BulkRequest.class))).thenReturn(future1, futures.toArray(ActionFuture[]::new)); + return client; + } + + @SuppressWarnings("unchecked") + private static ActionFuture makeFuture(BulkResponse response) { + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + return future; + } + + private ResultsPersisterService buildResultsPersisterService(Client client) { + ThreadPool tp = mock(ThreadPool.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, + new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS, + MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, + ClusterService.USER_DEFINED_META_DATA, + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING))); + ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp); + + return new ResultsPersisterService(client, clusterService, Settings.EMPTY); + } +}