From 7c0161a4c02000bed885294a723b6779bbb849ed Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 9 Jan 2020 13:46:33 +0000 Subject: [PATCH 01/14] Calculate the results retention period based on the latest bucket time --- .../ml/integration/DeleteExpiredDataIT.java | 10 +-- .../TransportDeleteExpiredDataAction.java | 2 +- .../AbstractExpiredJobDataRemover.java | 13 +-- .../job/retention/ExpiredResultsRemover.java | 83 ++++++++++++++++++- .../retention/ExpiredResultsRemoverTests.java | 75 +++++++++++++++-- 5 files changed, 162 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 25e2e3f50f3c3..68410d50947e5 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -57,15 +57,15 @@ public void setUpData() throws IOException { .setMapping("time", "type=date,format=epoch_millis") .get(); - // We are going to create data for last 2 days - long nowMillis = System.currentTimeMillis(); + // We are going to create 2 days of data starting 24 hrs ago + long lastestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis(); int totalBuckets = 3 * 24; int normalRate = 10; int anomalousRate = 100; int anomalousBucket = 30; BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); for (int bucket = 0; bucket < totalBuckets; bucket++) { - long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); + long timestamp = lastestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate; for (int point = 0; point < bucketRate; point++) { IndexRequest indexRequest = new IndexRequest(DATA_INDEX); @@ -208,7 +208,7 @@ public void testDeleteExpiredData() throws Exception { assertThat(getModelSnapshots("no-retention").size(), equalTo(2)); List buckets = getBuckets("results-retention"); - assertThat(buckets.size(), is(lessThanOrEqualTo(24))); + assertThat(buckets.size(), is(lessThanOrEqualTo(25))); assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); assertThat(getRecords("results-retention").size(), equalTo(0)); @@ -223,7 +223,7 @@ public void testDeleteExpiredData() throws Exception { assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2)); buckets = getBuckets("results-and-snapshots-retention"); - assertThat(buckets.size(), is(lessThanOrEqualTo(24))); + assertThat(buckets.size(), is(lessThanOrEqualTo(25))); assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 1ff0cf2208853..53a8859f53788 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -81,7 +81,7 @@ private void deleteExpiredData(ActionListener Supplier isTimedOutSupplier) { AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( - new ExpiredResultsRemover(client, auditor), + new ExpiredResultsRemover(client, auditor, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME), new ExpiredForecastsRemover(client, threadPool), new ExpiredModelSnapshotsRemover(client, threadPool), new UnusedStateRemover(client, clusterService) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index c6e3fe9dbf670..8e13d831ee928 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -68,9 +68,12 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)); + + calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap( + cutoffEpochMs -> removeDataBefore(job, cutoffEpochMs, + ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)), + listener::onFailure + )); } private WrappedBatchedJobsIterator newJobIterator() { @@ -78,9 +81,9 @@ private WrappedBatchedJobsIterator newJobIterator() { return new WrappedBatchedJobsIterator(jobsIterator); } - private long calcCutoffEpochMs(long retentionDays) { + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); - return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()); } protected abstract Long getRetentionDays(Job job); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index fff2c23ab75a6..bb78421c7bddd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -8,29 +8,53 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; +import java.io.IOException; +import java.io.InputStream; import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Removes all results that have expired the configured retention time @@ -48,11 +72,15 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private final OriginSettingClient client; private final AnomalyDetectionAuditor auditor; + private final ThreadPool threadPool; + private final String executor; - public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor) { + public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool, String executor) { super(client); this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); + this.threadPool = Objects.requireNonNull(threadPool); + this.executor= Objects.requireNonNull(executor); } @Override @@ -65,7 +93,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener() { + client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { try { @@ -107,6 +135,57 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { return request; } + @Override + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, executor, listener, false); + latestBucketTime(jobId, ActionListener.wrap( + latestTime -> { + long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + threadedActionListener.onResponse(cutoff); + }, + listener::onFailure + )); + } + + private void latestBucketTime(String jobId, ActionListener listener) { + SortBuilder sortBuilder = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); + QueryBuilder bucketType = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.sort(sortBuilder); + searchSourceBuilder.query(bucketType); + searchSourceBuilder.size(1); + searchSourceBuilder.trackTotalHits(false); + + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(searchSourceBuilder); + searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length == 0) { + // no buckets found + listener.onResponse(null); + } else { + + try (InputStream stream = hits[0].getSourceRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null); + listener.onResponse(bucket.getTimestamp().getTime()); + } catch (IOException e) { + listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e)); + } + } + }, listener::onFailure + ), client::search); + + + } + private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) { Instant instant = Instant.ofEpochMilli(cutoffEpochMs); ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index b4c5a051fb8c1..e5957cb32a016 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -5,28 +5,34 @@ */ package org.elasticsearch.xpack.ml.job.retention; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; import org.junit.Before; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.equalTo; @@ -50,7 +56,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { public void setUpTests() { capturedDeleteByQueryRequests = new ArrayList<>(); - client = org.mockito.Mockito.mock(Client.class); + client = mock(Client.class); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); listener = mock(ActionListener.class); } @@ -88,6 +94,8 @@ public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); + givenBucket(new Bucket("id_not_important", new Date(), 60)); + createExpiredResultsRemover().remove(listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); @@ -106,6 +114,8 @@ public void testRemove_GivenTimeout() throws Exception { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); + givenBucket(new Bucket("id_not_important", new Date(), 60)); + final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); @@ -124,6 +134,8 @@ public void testRemove_GivenClientRequestsFailed() throws IOException { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); + givenBucket(new Bucket("id_not_important", new Date(), 60)); + createExpiredResultsRemover().remove(listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(1)); @@ -132,6 +144,22 @@ public void testRemove_GivenClientRequestsFailed() throws IOException { verify(listener).onFailure(any()); } + @SuppressWarnings("unchecked") + public void testCalcCutoffEpochMs() throws IOException { + String jobId = "calc-cutoff"; + givenJobs(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build())); + + Date latest = new Date(); + givenBucket(new Bucket(jobId, latest, 60)); + + ActionListener cutoffListener = mock(ActionListener.class); + createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener); + + long dayInMills = 60 * 60 * 24 * 1000; + long expectedCutoffTime = latest.getTime() - dayInMills; + verify(cutoffListener).onResponse(eq(expectedCutoffTime)); + } + private void givenClientRequestsSucceed() { givenClientRequests(true); } @@ -142,9 +170,7 @@ private void givenClientRequestsFailed() { @SuppressWarnings("unchecked") private void givenClientRequests(boolean shouldSucceed) { - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) { + doAnswer(invocationOnMock -> { capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -157,10 +183,43 @@ public Void answer(InvocationOnMock invocationOnMock) { } return null; } - }).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); + ).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); + } + + @SuppressWarnings("unchecked") + private void givenJobs(List jobs) throws IOException { + SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); + + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + when(client.search(any())).thenReturn(future); + } + + @SuppressWarnings("unchecked") + private void givenBucket(Bucket bucket) throws IOException { + SearchResponse searchResponse = AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(bucket)); + + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(searchResponse); + return null; + }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); } private ExpiredResultsRemover createExpiredResultsRemover() { - return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class)); + final String executorName = "expired-remover-test"; + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executor = mock(ExecutorService.class); + + when(threadPool.executor(eq(executorName))).thenReturn(executor); + + doAnswer(invocationOnMock -> { + Runnable run = (Runnable) invocationOnMock.getArguments()[0]; + run.run(); + return null; + } + ).when(executor).execute(any()); + + return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool, executorName); } } From 7ca8db9d6610516ed415718d7db0c02be1841401 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 9 Jan 2020 14:02:03 +0000 Subject: [PATCH 02/14] Define retention period in docs --- docs/reference/ml/ml-shared.asciidoc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index f25cfb94e8bed..4323789ab42c8 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -963,10 +963,11 @@ is `shared`, which generates an index named `.ml-anomalies-shared`. end::results-index-name[] tag::results-retention-days[] -Advanced configuration option. The number of days for which job results are -retained. Once per day at 00:30 (server time), results older than this period -are deleted from {es}. The default value is null, which means results are -retained. +Advanced configuration option. When set this option will prune results older +than a certain age. The age of a result is calculated as the time difference +from the result to the latest result. Once per day at 00:30 (server time), +results older than this period are deleted from {es}. The default value is +null, which means all results are retained. end::results-retention-days[] tag::retain[] From f620d6b1071b18c37af33f72b87beecd532b048b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 9 Jan 2020 15:21:04 +0000 Subject: [PATCH 03/14] Start expired snapshots --- .../TransportDeleteExpiredDataAction.java | 2 +- .../ExpiredModelSnapshotsRemover.java | 57 ++++++++++++++++++- .../job/retention/ExpiredResultsRemover.java | 10 ++-- .../ExpiredModelSnapshotsRemoverTests.java | 5 +- .../retention/ExpiredResultsRemoverTests.java | 6 +- 5 files changed, 68 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 53a8859f53788..273fe202e478b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -81,7 +81,7 @@ private void deleteExpiredData(ActionListener Supplier isTimedOutSupplier) { AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( - new ExpiredResultsRemover(client, auditor, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME), + new ExpiredResultsRemover(client, auditor, threadPool), new ExpiredForecastsRemover(client, threadPool), new ExpiredModelSnapshotsRemover(client, threadPool), new UnusedStateRemover(client, clusterService) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 221f9d9debf87..ceaa952ee662c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -15,10 +15,14 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -26,13 +30,20 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Deletes all model snapshots that have expired the configured retention time @@ -69,6 +80,50 @@ protected Long getRetentionDays(Job job) { return job.getModelSnapshotRetentionDays(); } + @Override + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); + + latestSnapshotTimeStamp(jobId, ActionListener.wrap( + latestTime -> { + long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + threadedActionListener.onResponse(cutoff); + }, + listener::onFailure + )); + } + + private void latestSnapshotTimeStamp(String jobId, ActionListener listener) { + SortBuilder sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); + QueryBuilder bucketType = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.sort(sortBuilder); + searchSourceBuilder.query(bucketType); + searchSourceBuilder.size(1); + searchSourceBuilder.trackTotalHits(false); + + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(searchSourceBuilder); + searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length == 0) { + // no snapshots found + listener.onResponse(null); + } else { + ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef()); + listener.onResponse(snapshot.getTimestamp().getTime()); + } + }, listener::onFailure + ), client::search); + } + @Override protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { if (job.getModelSnapshotId() == null) { @@ -96,7 +151,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener expiredSnapshotsListener(String jobId, ActionListener listener) { - return new ActionListener() { + return new ActionListener<>() { @Override public void onResponse(SearchResponse searchResponse) { try { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index bb78421c7bddd..e266e968729a6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; @@ -73,14 +74,12 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private final OriginSettingClient client; private final AnomalyDetectionAuditor auditor; private final ThreadPool threadPool; - private final String executor; - public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool, String executor) { + public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) { super(client); this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); this.threadPool = Objects.requireNonNull(threadPool); - this.executor= Objects.requireNonNull(executor); } @Override @@ -137,7 +136,8 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { @Override void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { - ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, executor, listener, false); + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestBucketTime(jobId, ActionListener.wrap( latestTime -> { long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); @@ -182,8 +182,6 @@ private void latestBucketTime(String jobId, ActionListener listener) { } }, listener::onFailure ), client::search); - - } private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 6e332bf148d17..2ad62dfb2b92b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -218,6 +218,10 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); } + public void testCalcCutoffEpochMs() { + + } + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool); } @@ -283,5 +287,4 @@ public Void answer(InvocationOnMock invocationOnMock) { } }).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); } - } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index e5957cb32a016..891cf889bbe8e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; import org.junit.Before; @@ -207,11 +208,10 @@ private void givenBucket(Bucket bucket) throws IOException { } private ExpiredResultsRemover createExpiredResultsRemover() { - final String executorName = "expired-remover-test"; ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); - when(threadPool.executor(eq(executorName))).thenReturn(executor); + when(threadPool.executor(eq(MachineLearning.UTILITY_THREAD_POOL_NAME))).thenReturn(executor); doAnswer(invocationOnMock -> { Runnable run = (Runnable) invocationOnMock.getArguments()[0]; @@ -220,6 +220,6 @@ private ExpiredResultsRemover createExpiredResultsRemover() { } ).when(executor).execute(any()); - return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool, executorName); + return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool); } } From 6c56dda250b1f3cdf59da1c3e2ed19dc1dfde66d Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 9 Jan 2020 16:40:19 +0000 Subject: [PATCH 04/14] Adapt for origin setting client --- .../ExpiredModelSnapshotsRemover.java | 29 ++++++------- .../job/retention/ExpiredResultsRemover.java | 43 ++++++++----------- 2 files changed, 32 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index ceaa952ee662c..da6a37c964ffe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -42,9 +42,6 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - /** * Deletes all model snapshots that have expired the configured retention time * of their respective job with the exception of the currently used snapshot. @@ -109,19 +106,19 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener listener searchRequest.source(searchSourceBuilder); searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, - ActionListener.wrap( - response -> { - SearchHit[] hits = response.getHits().getHits(); - if (hits.length == 0) { - // no snapshots found - listener.onResponse(null); - } else { - ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef()); - listener.onResponse(snapshot.getTimestamp().getTime()); - } - }, listener::onFailure - ), client::search); + client.search(searchRequest, ActionListener.wrap( + response -> { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length == 0) { + // no snapshots found + listener.onResponse(null); + } else { + ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef()); + listener.onResponse(snapshot.getTimestamp().getTime()); + } + }, + listener::onFailure) + ); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index e266e968729a6..ade7d199fbe96 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -11,7 +11,6 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; @@ -54,9 +53,6 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - /** * Removes all results that have expired the configured retention time * of their respective job. A result is deleted if its timestamp is earlier @@ -162,26 +158,25 @@ private void latestBucketTime(String jobId, ActionListener listener) { searchRequest.source(searchSourceBuilder); searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, - ActionListener.wrap( - response -> { - SearchHit[] hits = response.getHits().getHits(); - if (hits.length == 0) { - // no buckets found - listener.onResponse(null); - } else { - - try (InputStream stream = hits[0].getSourceRef().streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null); - listener.onResponse(bucket.getTimestamp().getTime()); - } catch (IOException e) { - listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e)); - } - } - }, listener::onFailure - ), client::search); + client.search(searchRequest, ActionListener.wrap( + response -> { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length == 0) { + // no buckets found + listener.onResponse(null); + } else { + + try (InputStream stream = hits[0].getSourceRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null); + listener.onResponse(bucket.getTimestamp().getTime()); + } catch (IOException e) { + listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e)); + } + } + }, listener::onFailure + )); } private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) { From 11fa81bd8467c8b6c086c763d6af69bb95778f17 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 15 Jan 2020 16:48:31 +0000 Subject: [PATCH 05/14] Fix the tests --- .../AbstractExpiredJobDataRemover.java | 11 +- .../ExpiredModelSnapshotsRemover.java | 15 +- .../job/retention/ExpiredResultsRemover.java | 8 +- .../AbstractExpiredJobDataRemoverTests.java | 8 +- .../ExpiredModelSnapshotsRemoverTests.java | 231 +++++++++--------- .../retention/ExpiredResultsRemoverTests.java | 94 ++++--- 6 files changed, 186 insertions(+), 181 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 8e13d831ee928..dbe7cf27f9a16 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -70,8 +70,15 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener removeDataBefore(job, cutoffEpochMs, - ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)), + cutoffEpochMs -> { + if (cutoffEpochMs == null) { + removeData(jobIterator, listener, isTimedOutSupplier); + } else { + removeDataBefore(job, cutoffEpochMs, ActionListener.wrap( + response -> removeData(jobIterator, listener, isTimedOutSupplier), + listener::onFailure)); + } + }, listener::onFailure )); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index da6a37c964ffe..a5eea9e3876a5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -30,8 +30,6 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; -import org.elasticsearch.xpack.core.ml.job.results.Bucket; -import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; @@ -84,8 +82,12 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener li latestSnapshotTimeStamp(jobId, ActionListener.wrap( latestTime -> { - long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - threadedActionListener.onResponse(cutoff); + if (latestTime == null) { + threadedActionListener.onResponse(null); + } else { + long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + threadedActionListener.onResponse(cutoff); + } }, listener::onFailure )); @@ -93,11 +95,12 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener li private void latestSnapshotTimeStamp(String jobId, ActionListener listener) { SortBuilder sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); - QueryBuilder bucketType = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE); + QueryBuilder snapshotQuery = QueryBuilders.boolQuery() + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(sortBuilder); - searchSourceBuilder.query(bucketType); + searchSourceBuilder.query(snapshotQuery); searchSourceBuilder.size(1); searchSourceBuilder.trackTotalHits(false); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index ade7d199fbe96..862a945a5fab8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -136,8 +136,12 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener li MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestBucketTime(jobId, ActionListener.wrap( latestTime -> { - long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - threadedActionListener.onResponse(cutoff); + if (latestTime == null) { + threadedActionListener.onResponse(null); + } else { + long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + threadedActionListener.onResponse(cutoff); + } }, listener::onFailure )); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index eb29ba06b17ca..2d115d4fba2cf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -82,10 +82,10 @@ static SearchResponse createSearchResponse(List toXContent static void givenJobs(Client client, List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); - doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(response); - return null; + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(response); + return null; }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 2ad62dfb2b92b..a178cd48b7cad 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -12,19 +12,16 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; -import org.junit.After; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -33,8 +30,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; @@ -45,114 +43,97 @@ import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private Client client; private OriginSettingClient originSettingClient; - private ThreadPool threadPool; private List capturedSearchRequests; private List capturedDeleteModelSnapshotRequests; - private List searchResponsesPerCall; private TestListener listener; @Before public void setUpTests() { capturedSearchRequests = new ArrayList<>(); capturedDeleteModelSnapshotRequests = new ArrayList<>(); - searchResponsesPerCall = new ArrayList<>(); client = mock(Client.class); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); listener = new TestListener(); - - // Init thread pool - Settings settings = Settings.builder() - .put("node.name", "expired_model_snapshots_remover_test") - .build(); - threadPool = new ThreadPool(settings, - new FixedExecutorBuilder(settings, MachineLearning.UTILITY_THREAD_POOL_NAME, 1, 1000, "")); - } - - @After - public void shutdownThreadPool() { - terminate(threadPool); - } - - public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { - givenClientRequestsSucceed(Arrays.asList( - JobTests.buildJobBuilder("foo").build(), - JobTests.buildJobBuilder("bar").build() - )); - - createExpiredModelSnapshotsRemover().remove(listener, () -> false); - - listener.waitToCompletion(); - assertThat(listener.success, is(true)); - verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); } public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { - givenClientRequestsSucceed(Collections.singletonList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); + List responses = Arrays.asList( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(JobTests.buildJobBuilder("foo") + .setModelSnapshotRetentionDays(7L).build())), + AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); + + givenClientRequestsSucceed(responses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); + verify(client, times(2)).execute(eq(SearchAction.INSTANCE), any(), any()); } public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException { - givenClientRequestsSucceed( - Arrays.asList( - JobTests.buildJobBuilder("none").build(), - JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), - JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() - )); - - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - + List searchResponses = new ArrayList<>(); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList( + JobTests.buildJobBuilder("job-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), + JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() + ))); + + Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + + Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis()); + ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted))); + + ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1))); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); + + givenClientRequestsSucceed(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - assertThat(capturedSearchRequests.size(), equalTo(2)); - SearchRequest searchRequest = capturedSearchRequests.get(0); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); - searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); + assertThat(capturedSearchRequests.size(), equalTo(5)); + SearchRequest searchRequest = capturedSearchRequests.get(1); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-1")})); + searchRequest = capturedSearchRequests.get(3); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-2")})); - assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); + assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); - deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); - deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); + assertThat(deleteSnapshotRequest.getJobId(), equalTo("job-1")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("old-snapshot")); } public void testRemove_GivenTimeout() throws IOException { - givenClientRequestsSucceed( - Arrays.asList( + List searchResponses = new ArrayList<>(); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList( JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() - )); + ))); List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + + givenClientRequestsSucceed(searchResponses); final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); @@ -164,52 +145,53 @@ public void testRemove_GivenTimeout() throws IOException { } public void testRemove_GivenClientSearchRequestsFail() throws IOException { - givenClientSearchRequestsFail( - Arrays.asList( - JobTests.buildJobBuilder("none").build(), + List searchResponses = new ArrayList<>(); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList( JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() - )); - - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + ))); + givenClientSearchRequestsFail(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); - assertThat(capturedSearchRequests.size(), equalTo(1)); - SearchRequest searchRequest = capturedSearchRequests.get(0); + assertThat(capturedSearchRequests.size(), equalTo(2)); + SearchRequest searchRequest = capturedSearchRequests.get(1); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); } public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException { - givenClientDeleteModelSnapshotRequestsFail( - Arrays.asList( - JobTests.buildJobBuilder("none").build(), + List searchResponses = new ArrayList<>(); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList( JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() - )); + ))); - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), + ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1"); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + + List snapshots1JobSnapshots = Arrays.asList( + snapshot1_1, createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + + ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1"); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); + givenClientDeleteModelSnapshotRequestsFail(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); - assertThat(capturedSearchRequests.size(), equalTo(1)); - SearchRequest searchRequest = capturedSearchRequests.get(0); + assertThat(capturedSearchRequests.size(), equalTo(3)); + SearchRequest searchRequest = capturedSearchRequests.get(1); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); @@ -218,53 +200,76 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); } - public void testCalcCutoffEpochMs() { + @SuppressWarnings("unchecked") + public void testCalcCutoffEpochMs() throws IOException { + List searchResponses = new ArrayList<>(); + + Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + + givenClientRequests(searchResponses, true, true); + + long retentionDays = 3L; + ActionListener cutoffListener = mock(ActionListener.class); + createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener); + long dayInMills = 60 * 60 * 24 * 1000; + long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays); + verify(cutoffListener).onResponse(eq(expectedCutoffTime)); } private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executor = mock(ExecutorService.class); + + when(threadPool.executor(eq(MachineLearning.UTILITY_THREAD_POOL_NAME))).thenReturn(executor); + + doAnswer(invocationOnMock -> { + Runnable run = (Runnable) invocationOnMock.getArguments()[0]; + run.run(); + return null; + } + ).when(executor).execute(any()); return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) { - return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).build(); + return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build(); + } + + private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { + return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build(); } - private void givenClientRequestsSucceed(List jobs) throws IOException { - givenClientRequests(jobs, true, true); + private void givenClientRequestsSucceed(List searchResponses) { + givenClientRequests(searchResponses, true, true); } - private void givenClientSearchRequestsFail(List jobs) throws IOException { - givenClientRequests(jobs, false, true); + private void givenClientSearchRequestsFail(List searchResponses) { + givenClientRequests(searchResponses, false, true); } - private void givenClientDeleteModelSnapshotRequestsFail(List jobs) throws IOException { - givenClientRequests(jobs, true, false); + private void givenClientDeleteModelSnapshotRequestsFail(List searchResponses) { + givenClientRequests(searchResponses, true, false); } @SuppressWarnings("unchecked") - private void givenClientRequests(List jobs, - boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) throws IOException { - SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); + private void givenClientRequests(List searchResponses, + boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) { doAnswer(new Answer() { - int callCount = 0; - AtomicBoolean isJobQuery = new AtomicBoolean(true); + AtomicInteger callCount = new AtomicInteger(); @Override public Void answer(InvocationOnMock invocationOnMock) { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - if (isJobQuery.get()) { - listener.onResponse(response); - isJobQuery.set(false); - return null; - } - SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1]; capturedSearchRequests.add(searchRequest); - if (shouldSearchRequestsSucceed) { - listener.onResponse(searchResponsesPerCall.get(callCount++)); + // Only the last search request should fail + if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) { + listener.onResponse(searchResponses.get(callCount.getAndIncrement())); } else { listener.onFailure(new RuntimeException("search failed")); } @@ -272,9 +277,7 @@ public Void answer(InvocationOnMock invocationOnMock) { } }).when(client).execute(same(SearchAction.INSTANCE), any(), any()); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) { + doAnswer(invocationOnMock -> { capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -285,6 +288,6 @@ public Void answer(InvocationOnMock invocationOnMock) { } return null; } - }).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); + ).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 891cf889bbe8e..29c8dad0c668c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.job.retention; -import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; @@ -63,7 +62,7 @@ public void setUpTests() { } public void testRemove_GivenNoJobs() throws IOException { - givenClientRequestsSucceed(); + givenDBQRequestsSucceed(); AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList()); createExpiredResultsRemover().remove(listener, () -> false); @@ -73,7 +72,7 @@ public void testRemove_GivenNoJobs() throws IOException { } public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { - givenClientRequestsSucceed(); + givenDBQRequestsSucceed(); AbstractExpiredJobDataRemoverTests.givenJobs(client, Arrays.asList( JobTests.buildJobBuilder("foo").build(), @@ -86,16 +85,14 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); } - public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception { - givenClientRequestsSucceed(); - AbstractExpiredJobDataRemoverTests.givenJobs(client, - Arrays.asList( + public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() { + givenDBQRequestsSucceed(); + + givenSearchResponses(Arrays.asList( JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), - JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() - )); - - givenBucket(new Bucket("id_not_important", new Date(), 60)); + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), + new Bucket("id_not_important", new Date(), 60)); createExpiredResultsRemover().remove(listener, () -> false); @@ -107,15 +104,12 @@ public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception verify(listener).onResponse(true); } - public void testRemove_GivenTimeout() throws Exception { - givenClientRequestsSucceed(); - AbstractExpiredJobDataRemoverTests.givenJobs(client, - Arrays.asList( - JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), - JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() - )); - - givenBucket(new Bucket("id_not_important", new Date(), 60)); + public void testRemove_GivenTimeout() { + givenDBQRequestsSucceed(); + givenSearchResponses(Arrays.asList( + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() + ), new Bucket("id_not_important", new Date(), 60)); final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); @@ -126,16 +120,14 @@ public void testRemove_GivenTimeout() throws Exception { verify(listener).onResponse(false); } - public void testRemove_GivenClientRequestsFailed() throws IOException { - givenClientRequestsFailed(); - AbstractExpiredJobDataRemoverTests.givenJobs(client, + public void testRemove_GivenClientRequestsFailed() { + givenDBQRequestsFailed(); + givenSearchResponses( Arrays.asList( - JobTests.buildJobBuilder("none").build(), - JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), - JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() - )); - - givenBucket(new Bucket("id_not_important", new Date(), 60)); + JobTests.buildJobBuilder("none").build(), + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), + new Bucket("id_not_important", new Date(), 60)); createExpiredResultsRemover().remove(listener, () -> false); @@ -146,12 +138,12 @@ public void testRemove_GivenClientRequestsFailed() throws IOException { } @SuppressWarnings("unchecked") - public void testCalcCutoffEpochMs() throws IOException { + public void testCalcCutoffEpochMs() { String jobId = "calc-cutoff"; - givenJobs(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build())); - Date latest = new Date(); - givenBucket(new Bucket(jobId, latest, 60)); + + givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()), + new Bucket(jobId, latest, 60)); ActionListener cutoffListener = mock(ActionListener.class); createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener); @@ -161,16 +153,16 @@ public void testCalcCutoffEpochMs() throws IOException { verify(cutoffListener).onResponse(eq(expectedCutoffTime)); } - private void givenClientRequestsSucceed() { - givenClientRequests(true); + private void givenDBQRequestsSucceed() { + givenDBQRequest(true); } - private void givenClientRequestsFailed() { - givenClientRequests(false); + private void givenDBQRequestsFailed() { + givenDBQRequest(false); } @SuppressWarnings("unchecked") - private void givenClientRequests(boolean shouldSucceed) { + private void givenDBQRequest(boolean shouldSucceed) { doAnswer(invocationOnMock -> { capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); ActionListener listener = @@ -188,23 +180,19 @@ private void givenClientRequests(boolean shouldSucceed) { } @SuppressWarnings("unchecked") - private void givenJobs(List jobs) throws IOException { - SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); - - ActionFuture future = mock(ActionFuture.class); - when(future.actionGet()).thenReturn(response); - when(client.search(any())).thenReturn(future); - } - - @SuppressWarnings("unchecked") - private void givenBucket(Bucket bucket) throws IOException { - SearchResponse searchResponse = AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(bucket)); - + private void givenSearchResponses(List jobs, Bucket bucket) { doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; - listener.onResponse(searchResponse); + SearchRequest request = (SearchRequest) invocationOnMock.getArguments()[1]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + + if (request.indices()[0].startsWith(AnomalyDetectorsIndex.jobResultsIndexPrefix())) { + // asking for the bucket result + listener.onResponse(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(bucket))); + } else { + listener.onResponse(AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs)); + } return null; - }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); + }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); } private ExpiredResultsRemover createExpiredResultsRemover() { From aee2d136a65b62dbd462fe4e57e12c0194d02f7c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 16 Jan 2020 10:34:56 +0000 Subject: [PATCH 06/14] Rework docs --- docs/reference/ml/ml-shared.asciidoc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 4323789ab42c8..0159514eb2529 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -863,9 +863,10 @@ example, `1575402236000 `. end::model-snapshot-id[] tag::model-snapshot-retention-days[] -The time in days that model snapshots are retained for the job. Older snapshots -are deleted. The default value is `1`, which means snapshots are retained for -one day (twenty-four hours). +Advanced configuration option. Denotes the period for which model snapshots +will be retained. Age is calculated as the time from the newest model snapshot's +timestamp, older snapshots are automatically deleted. The default value is `1`, +which means snapshots are retained for one day (twenty-four hours). end::model-snapshot-retention-days[] tag::multivariate-by-fields[] @@ -964,10 +965,10 @@ end::results-index-name[] tag::results-retention-days[] Advanced configuration option. When set this option will prune results older -than a certain age. The age of a result is calculated as the time difference -from the result to the latest result. Once per day at 00:30 (server time), -results older than this period are deleted from {es}. The default value is -null, which means all results are retained. +than this age. Age is calculated as the difference between the latest +bucket result's timestamp and the result's timestamp. Once per day at 00:30 +(server time), results older than this are deleted from {es}. The default +value is null, which means all results are retained. end::results-retention-days[] tag::retain[] From 498f5b0adea88ab5e7d8e50d45147946a0f8f69c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 20 Jan 2020 16:40:14 +0000 Subject: [PATCH 07/14] Address review comments --- .../ml/integration/DeleteExpiredDataIT.java | 6 +++--- .../retention/AbstractExpiredJobDataRemover.java | 10 ++-------- .../AbstractExpiredJobDataRemoverTests.java | 16 ++++++++++++---- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 68410d50947e5..08d139c27c25e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -57,15 +57,15 @@ public void setUpData() throws IOException { .setMapping("time", "type=date,format=epoch_millis") .get(); - // We are going to create 2 days of data starting 24 hrs ago - long lastestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis(); + // We are going to create 3 days of data starting 1 hr ago + long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis(); int totalBuckets = 3 * 24; int normalRate = 10; int anomalousRate = 100; int anomalousBucket = 30; BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); for (int bucket = 0; bucket < totalBuckets; bucket++) { - long timestamp = lastestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); + long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate; for (int point = 0; point < bucketRate; point++) { IndexRequest indexRequest = new IndexRequest(DATA_INDEX); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index dbe7cf27f9a16..c0535d32e8f7e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -7,7 +7,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -16,12 +15,9 @@ import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; -import java.time.Clock; -import java.time.Instant; import java.util.Deque; import java.util.Iterator; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -88,10 +84,8 @@ private WrappedBatchedJobsIterator newJobIterator() { return new WrappedBatchedJobsIterator(jobsIterator); } - void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { - long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); - listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()); - } + // package-private for testing + abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener); protected abstract Long getRetentionDays(Job job); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 2d115d4fba2cf..2d7f3d1b32454 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -25,6 +26,8 @@ import org.junit.Before; import java.io.IOException; +import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -59,6 +62,11 @@ protected Long getRetentionDays(Job job) { return randomBoolean() ? null : 0L; } + protected void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); + listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()); + } + @Override protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { listener.onResponse(Boolean.TRUE); @@ -82,10 +90,10 @@ static SearchResponse createSearchResponse(List toXContent static void givenJobs(Client client, List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); - doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(response); - return null; + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(response); + return null; }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); } From 580c51765c28ea3d23112ea6e044f505f9ad68ea Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 20 Jan 2020 17:24:47 +0000 Subject: [PATCH 08/14] Make package-private --- .../xpack/ml/job/retention/AbstractExpiredJobDataRemover.java | 3 +-- .../xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java | 2 +- .../xpack/ml/job/retention/ExpiredResultsRemover.java | 2 +- .../ml/job/retention/AbstractExpiredJobDataRemoverTests.java | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index c0535d32e8f7e..e0fd4f224cc3b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -84,10 +84,9 @@ private WrappedBatchedJobsIterator newJobIterator() { return new WrappedBatchedJobsIterator(jobsIterator); } - // package-private for testing abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener); - protected abstract Long getRetentionDays(Job job); + abstract Long getRetentionDays(Job job); /** * Template method to allow implementation details of various types of data (e.g. results, model snapshots). diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index a5eea9e3876a5..e76f1c7b13052 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -71,7 +71,7 @@ public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threa } @Override - protected Long getRetentionDays(Job job) { + Long getRetentionDays(Job job) { return job.getModelSnapshotRetentionDays(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 862a945a5fab8..16ba77c582e8f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -79,7 +79,7 @@ public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor } @Override - protected Long getRetentionDays(Job job) { + Long getRetentionDays(Job job) { return job.getResultsRetentionDays(); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 2d7f3d1b32454..0d8955c05774f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -62,7 +62,7 @@ protected Long getRetentionDays(Job job) { return randomBoolean() ? null : 0L; } - protected void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()); } From e7724b9e6a57bf6a4f499ae75e197ba11b5fef60 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 20 Jan 2020 17:50:34 +0000 Subject: [PATCH 09/14] nits --- .../xpack/ml/integration/DeleteExpiredDataIT.java | 10 ++++------ .../job/retention/AbstractExpiredJobDataRemover.java | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 08d139c27c25e..c420567754b8d 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -33,10 +33,8 @@ import org.junit.After; import org.junit.Before; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -52,12 +50,12 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { private static final String DATA_INDEX = "delete-expired-data-test-data"; @Before - public void setUpData() throws IOException { + public void setUpData() { client().admin().indices().prepareCreate(DATA_INDEX) .setMapping("time", "type=date,format=epoch_millis") .get(); - // We are going to create 3 days of data starting 1 hr ago + // We are going to create 3 days of data ending 1 hr ago long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis(); int totalBuckets = 3 * 24; int normalRate = 10; @@ -120,7 +118,7 @@ public void testDeleteExpiredData() throws Exception { String datafeedId = job.getId() + "-feed"; DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId()); - datafeedConfig.setIndices(Arrays.asList(DATA_INDEX)); + datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX)); DatafeedConfig datafeed = datafeedConfig.build(); registerDatafeed(datafeed); putDatafeed(datafeed); @@ -276,7 +274,7 @@ public void testDeleteExpiredData() throws Exception { private static Job.Builder newJobBuilder(String id) { Detector.Builder detector = new Detector.Builder(); detector.setFunction("count"); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(TimeValue.timeValueHours(1)); DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeField("time"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index e0fd4f224cc3b..439db5c21a914 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -92,9 +92,9 @@ private WrappedBatchedJobsIterator newJobIterator() { * Template method to allow implementation details of various types of data (e.g. results, model snapshots). * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. */ - protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener); + abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener); - protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { + static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { return QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")); From e5026ee0babea5c6291a5f6c7781942272b2ddb6 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 20 Jan 2020 18:05:02 +0000 Subject: [PATCH 10/14] Update docs/reference/ml/ml-shared.asciidoc Co-Authored-By: Lisa Cawley --- docs/reference/ml/ml-shared.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 0159514eb2529..046b83894f868 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -863,7 +863,7 @@ example, `1575402236000 `. end::model-snapshot-id[] tag::model-snapshot-retention-days[] -Advanced configuration option. Denotes the period for which model snapshots +Advanced configuration option. The period of time (in days) that model snapshots are retained. will be retained. Age is calculated as the time from the newest model snapshot's timestamp, older snapshots are automatically deleted. The default value is `1`, which means snapshots are retained for one day (twenty-four hours). From 8bd4f993144a97a513938a81fd4cedaa2acc9e77 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 20 Jan 2020 18:06:06 +0000 Subject: [PATCH 11/14] =?UTF-8?q?Add=20=E2=80=98in=20days=E2=80=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/reference/ml/ml-shared.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 046b83894f868..19750e983bbb6 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -965,7 +965,7 @@ end::results-index-name[] tag::results-retention-days[] Advanced configuration option. When set this option will prune results older -than this age. Age is calculated as the difference between the latest +than this age in days. Age is calculated as the difference between the latest bucket result's timestamp and the result's timestamp. Once per day at 00:30 (server time), results older than this are deleted from {es}. The default value is null, which means all results are retained. From 542869a73322a5d1c98c7101344d27c85ab71efe Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 21 Jan 2020 09:57:21 +0000 Subject: [PATCH 12/14] Apply docs suggestions from code review Co-Authored-By: Lisa Cawley --- docs/reference/ml/ml-shared.asciidoc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 19750e983bbb6..bfac67772f01f 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -864,7 +864,7 @@ end::model-snapshot-id[] tag::model-snapshot-retention-days[] Advanced configuration option. The period of time (in days) that model snapshots are retained. -will be retained. Age is calculated as the time from the newest model snapshot's +Age is calculated relative to the timestamp of the newest model snapshot. timestamp, older snapshots are automatically deleted. The default value is `1`, which means snapshots are retained for one day (twenty-four hours). end::model-snapshot-retention-days[] @@ -964,10 +964,10 @@ is `shared`, which generates an index named `.ml-anomalies-shared`. end::results-index-name[] tag::results-retention-days[] -Advanced configuration option. When set this option will prune results older -than this age in days. Age is calculated as the difference between the latest -bucket result's timestamp and the result's timestamp. Once per day at 00:30 -(server time), results older than this are deleted from {es}. The default +Advanced configuration option. The period of time (in days) that results are retained. +Age is calculated relative to the timestamp of the latest bucket result. +If this property has a non-null value, once per day at 00:30 +(server time), results that are the specified number of days older than the latest bucket result are deleted from {es}. The default value is null, which means all results are retained. end::results-retention-days[] From c553a425e51801ed2707f8fcd3acf34cc75d940a Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 21 Jan 2020 09:57:57 +0000 Subject: [PATCH 13/14] Update docs/reference/ml/ml-shared.asciidoc Co-Authored-By: Lisa Cawley --- docs/reference/ml/ml-shared.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index bfac67772f01f..a57c9a2f59eac 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -866,7 +866,7 @@ tag::model-snapshot-retention-days[] Advanced configuration option. The period of time (in days) that model snapshots are retained. Age is calculated relative to the timestamp of the newest model snapshot. timestamp, older snapshots are automatically deleted. The default value is `1`, -which means snapshots are retained for one day (twenty-four hours). +which means snapshots that are one day (twenty-four hours) older than the newest snapshot are deleted. end::model-snapshot-retention-days[] tag::multivariate-by-fields[] From 7569a2addfaf7506258dc5ba169d4165a6b821bd Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 21 Jan 2020 10:03:52 +0000 Subject: [PATCH 14/14] Remove duplicated line --- docs/reference/ml/ml-shared.asciidoc | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index a57c9a2f59eac..f754698cd9d34 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -865,8 +865,8 @@ end::model-snapshot-id[] tag::model-snapshot-retention-days[] Advanced configuration option. The period of time (in days) that model snapshots are retained. Age is calculated relative to the timestamp of the newest model snapshot. -timestamp, older snapshots are automatically deleted. The default value is `1`, -which means snapshots that are one day (twenty-four hours) older than the newest snapshot are deleted. +The default value is `1`, which means snapshots that are one day (twenty-four hours) +older than the newest snapshot are deleted. end::model-snapshot-retention-days[] tag::multivariate-by-fields[] @@ -966,9 +966,10 @@ end::results-index-name[] tag::results-retention-days[] Advanced configuration option. The period of time (in days) that results are retained. Age is calculated relative to the timestamp of the latest bucket result. -If this property has a non-null value, once per day at 00:30 -(server time), results that are the specified number of days older than the latest bucket result are deleted from {es}. The default -value is null, which means all results are retained. +If this property has a non-null value, once per day at 00:30 (server time), +results that are the specified number of days older than the latest +bucket result are deleted from {es}. The default value is null, which means all +results are retained. end::results-retention-days[] tag::retain[]