diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index f25cfb94e8bed..f754698cd9d34 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -863,9 +863,10 @@ example, `1575402236000 `. end::model-snapshot-id[] tag::model-snapshot-retention-days[] -The time in days that model snapshots are retained for the job. Older snapshots -are deleted. The default value is `1`, which means snapshots are retained for -one day (twenty-four hours). +Advanced configuration option. The period of time (in days) that model snapshots are retained. +Age is calculated relative to the timestamp of the newest model snapshot. +The default value is `1`, which means snapshots that are one day (twenty-four hours) +older than the newest snapshot are deleted. end::model-snapshot-retention-days[] tag::multivariate-by-fields[] @@ -963,10 +964,12 @@ is `shared`, which generates an index named `.ml-anomalies-shared`. end::results-index-name[] tag::results-retention-days[] -Advanced configuration option. The number of days for which job results are -retained. Once per day at 00:30 (server time), results older than this period -are deleted from {es}. The default value is null, which means results are -retained. +Advanced configuration option. The period of time (in days) that results are retained. +Age is calculated relative to the timestamp of the latest bucket result. +If this property has a non-null value, once per day at 00:30 (server time), +results that are the specified number of days older than the latest +bucket result are deleted from {es}. The default value is null, which means all +results are retained. end::results-retention-days[] tag::retain[] diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 25e2e3f50f3c3..c420567754b8d 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -33,10 +33,8 @@ import org.junit.After; import org.junit.Before; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -52,20 +50,20 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { private static final String DATA_INDEX = "delete-expired-data-test-data"; @Before - public void setUpData() throws IOException { + public void setUpData() { client().admin().indices().prepareCreate(DATA_INDEX) .setMapping("time", "type=date,format=epoch_millis") .get(); - // We are going to create data for last 2 days - long nowMillis = System.currentTimeMillis(); + // We are going to create 3 days of data ending 1 hr ago + long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis(); int totalBuckets = 3 * 24; int normalRate = 10; int anomalousRate = 100; int anomalousBucket = 30; BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); for (int bucket = 0; bucket < totalBuckets; bucket++) { - long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); + long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate; for (int point = 0; point < bucketRate; point++) { IndexRequest indexRequest = new IndexRequest(DATA_INDEX); @@ -120,7 +118,7 @@ public void testDeleteExpiredData() throws Exception { String datafeedId = job.getId() + "-feed"; DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId()); - datafeedConfig.setIndices(Arrays.asList(DATA_INDEX)); + datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX)); DatafeedConfig datafeed = datafeedConfig.build(); registerDatafeed(datafeed); putDatafeed(datafeed); @@ -208,7 +206,7 @@ public void testDeleteExpiredData() throws Exception { assertThat(getModelSnapshots("no-retention").size(), equalTo(2)); List buckets = getBuckets("results-retention"); - assertThat(buckets.size(), is(lessThanOrEqualTo(24))); + assertThat(buckets.size(), is(lessThanOrEqualTo(25))); assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); assertThat(getRecords("results-retention").size(), equalTo(0)); @@ -223,7 +221,7 @@ public void testDeleteExpiredData() throws Exception { assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2)); buckets = getBuckets("results-and-snapshots-retention"); - assertThat(buckets.size(), is(lessThanOrEqualTo(24))); + assertThat(buckets.size(), is(lessThanOrEqualTo(25))); assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0)); @@ -276,7 +274,7 @@ public void testDeleteExpiredData() throws Exception { private static Job.Builder newJobBuilder(String id) { Detector.Builder detector = new Detector.Builder(); detector.setFunction("count"); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(TimeValue.timeValueHours(1)); DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeField("time"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 1ff0cf2208853..273fe202e478b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -81,7 +81,7 @@ private void deleteExpiredData(ActionListener Supplier isTimedOutSupplier) { AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( - new ExpiredResultsRemover(client, auditor), + new ExpiredResultsRemover(client, auditor, threadPool), new ExpiredForecastsRemover(client, threadPool), new ExpiredModelSnapshotsRemover(client, threadPool), new UnusedStateRemover(client, clusterService) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index c6e3fe9dbf670..439db5c21a914 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -7,7 +7,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -16,12 +15,9 @@ import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; -import java.time.Clock; -import java.time.Instant; import java.util.Deque; import java.util.Iterator; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -68,9 +64,19 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)); + + calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap( + cutoffEpochMs -> { + if (cutoffEpochMs == null) { + removeData(jobIterator, listener, isTimedOutSupplier); + } else { + removeDataBefore(job, cutoffEpochMs, ActionListener.wrap( + response -> removeData(jobIterator, listener, isTimedOutSupplier), + listener::onFailure)); + } + }, + listener::onFailure + )); } private WrappedBatchedJobsIterator newJobIterator() { @@ -78,20 +84,17 @@ private WrappedBatchedJobsIterator newJobIterator() { return new WrappedBatchedJobsIterator(jobsIterator); } - private long calcCutoffEpochMs(long retentionDays) { - long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); - return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - } + abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener); - protected abstract Long getRetentionDays(Job job); + abstract Long getRetentionDays(Job job); /** * Template method to allow implementation details of various types of data (e.g. results, model snapshots). * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. */ - protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener); + abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener); - protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { + static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { return QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 221f9d9debf87..e76f1c7b13052 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -15,10 +15,14 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -27,12 +31,14 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * Deletes all model snapshots that have expired the configured retention time @@ -65,10 +71,59 @@ public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threa } @Override - protected Long getRetentionDays(Job job) { + Long getRetentionDays(Job job) { return job.getModelSnapshotRetentionDays(); } + @Override + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); + + latestSnapshotTimeStamp(jobId, ActionListener.wrap( + latestTime -> { + if (latestTime == null) { + threadedActionListener.onResponse(null); + } else { + long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + threadedActionListener.onResponse(cutoff); + } + }, + listener::onFailure + )); + } + + private void latestSnapshotTimeStamp(String jobId, ActionListener listener) { + SortBuilder sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); + QueryBuilder snapshotQuery = QueryBuilders.boolQuery() + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.sort(sortBuilder); + searchSourceBuilder.query(snapshotQuery); + searchSourceBuilder.size(1); + searchSourceBuilder.trackTotalHits(false); + + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(searchSourceBuilder); + searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + + client.search(searchRequest, ActionListener.wrap( + response -> { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length == 0) { + // no snapshots found + listener.onResponse(null); + } else { + ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef()); + listener.onResponse(snapshot.getTimestamp().getTime()); + } + }, + listener::onFailure) + ); + } + @Override protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { if (job.getModelSnapshotId() == null) { @@ -96,7 +151,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener expiredSnapshotsListener(String jobId, ActionListener listener) { - return new ActionListener() { + return new ActionListener<>() { @Override public void onResponse(SearchResponse searchResponse) { try { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index fff2c23ab75a6..16ba77c582e8f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -8,29 +8,50 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; +import java.io.IOException; +import java.io.InputStream; import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * Removes all results that have expired the configured retention time @@ -48,15 +69,17 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private final OriginSettingClient client; private final AnomalyDetectionAuditor auditor; + private final ThreadPool threadPool; - public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor) { + public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) { super(client); this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); + this.threadPool = Objects.requireNonNull(threadPool); } @Override - protected Long getRetentionDays(Job job) { + Long getRetentionDays(Job job) { return job.getResultsRetentionDays(); } @@ -65,7 +88,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener() { + client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { try { @@ -107,6 +130,59 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { return request; } + @Override + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, + MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); + latestBucketTime(jobId, ActionListener.wrap( + latestTime -> { + if (latestTime == null) { + threadedActionListener.onResponse(null); + } else { + long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + threadedActionListener.onResponse(cutoff); + } + }, + listener::onFailure + )); + } + + private void latestBucketTime(String jobId, ActionListener listener) { + SortBuilder sortBuilder = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); + QueryBuilder bucketType = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.sort(sortBuilder); + searchSourceBuilder.query(bucketType); + searchSourceBuilder.size(1); + searchSourceBuilder.trackTotalHits(false); + + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(searchSourceBuilder); + searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + + client.search(searchRequest, ActionListener.wrap( + response -> { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length == 0) { + // no buckets found + listener.onResponse(null); + } else { + + try (InputStream stream = hits[0].getSourceRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null); + listener.onResponse(bucket.getTimestamp().getTime()); + } catch (IOException e) { + listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e)); + } + } + }, listener::onFailure + )); + } + private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) { Instant instant = Instant.ofEpochMilli(cutoffEpochMs); ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index eb29ba06b17ca..0d8955c05774f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -25,6 +26,8 @@ import org.junit.Before; import java.io.IOException; +import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -59,6 +62,11 @@ protected Long getRetentionDays(Job job) { return randomBoolean() ? null : 0L; } + void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { + long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); + listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis()); + } + @Override protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { listener.onResponse(Boolean.TRUE); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 6e332bf148d17..a178cd48b7cad 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -12,19 +12,16 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; -import org.junit.After; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -33,8 +30,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; @@ -45,114 +43,97 @@ import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private Client client; private OriginSettingClient originSettingClient; - private ThreadPool threadPool; private List capturedSearchRequests; private List capturedDeleteModelSnapshotRequests; - private List searchResponsesPerCall; private TestListener listener; @Before public void setUpTests() { capturedSearchRequests = new ArrayList<>(); capturedDeleteModelSnapshotRequests = new ArrayList<>(); - searchResponsesPerCall = new ArrayList<>(); client = mock(Client.class); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); listener = new TestListener(); - - // Init thread pool - Settings settings = Settings.builder() - .put("node.name", "expired_model_snapshots_remover_test") - .build(); - threadPool = new ThreadPool(settings, - new FixedExecutorBuilder(settings, MachineLearning.UTILITY_THREAD_POOL_NAME, 1, 1000, "")); - } - - @After - public void shutdownThreadPool() { - terminate(threadPool); - } - - public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { - givenClientRequestsSucceed(Arrays.asList( - JobTests.buildJobBuilder("foo").build(), - JobTests.buildJobBuilder("bar").build() - )); - - createExpiredModelSnapshotsRemover().remove(listener, () -> false); - - listener.waitToCompletion(); - assertThat(listener.success, is(true)); - verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); } public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { - givenClientRequestsSucceed(Collections.singletonList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); + List responses = Arrays.asList( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(JobTests.buildJobBuilder("foo") + .setModelSnapshotRetentionDays(7L).build())), + AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); + + givenClientRequestsSucceed(responses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); + verify(client, times(2)).execute(eq(SearchAction.INSTANCE), any(), any()); } public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException { - givenClientRequestsSucceed( - Arrays.asList( - JobTests.buildJobBuilder("none").build(), - JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), - JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() - )); - - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - + List searchResponses = new ArrayList<>(); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList( + JobTests.buildJobBuilder("job-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), + JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() + ))); + + Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + + Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis()); + ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted))); + + ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1))); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); + + givenClientRequestsSucceed(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - assertThat(capturedSearchRequests.size(), equalTo(2)); - SearchRequest searchRequest = capturedSearchRequests.get(0); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); - searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); + assertThat(capturedSearchRequests.size(), equalTo(5)); + SearchRequest searchRequest = capturedSearchRequests.get(1); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-1")})); + searchRequest = capturedSearchRequests.get(3); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-2")})); - assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); + assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); - deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); - deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); + assertThat(deleteSnapshotRequest.getJobId(), equalTo("job-1")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("old-snapshot")); } public void testRemove_GivenTimeout() throws IOException { - givenClientRequestsSucceed( - Arrays.asList( + List searchResponses = new ArrayList<>(); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList( JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() - )); + ))); List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + + givenClientRequestsSucceed(searchResponses); final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); @@ -164,52 +145,53 @@ public void testRemove_GivenTimeout() throws IOException { } public void testRemove_GivenClientSearchRequestsFail() throws IOException { - givenClientSearchRequestsFail( - Arrays.asList( - JobTests.buildJobBuilder("none").build(), + List searchResponses = new ArrayList<>(); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList( JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() - )); - - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + ))); + givenClientSearchRequestsFail(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); - assertThat(capturedSearchRequests.size(), equalTo(1)); - SearchRequest searchRequest = capturedSearchRequests.get(0); + assertThat(capturedSearchRequests.size(), equalTo(2)); + SearchRequest searchRequest = capturedSearchRequests.get(1); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); } public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException { - givenClientDeleteModelSnapshotRequestsFail( - Arrays.asList( - JobTests.buildJobBuilder("none").build(), + List searchResponses = new ArrayList<>(); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList( JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() - )); + ))); - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), + ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1"); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + + List snapshots1JobSnapshots = Arrays.asList( + snapshot1_1, createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + + ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1"); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); + givenClientDeleteModelSnapshotRequestsFail(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); - assertThat(capturedSearchRequests.size(), equalTo(1)); - SearchRequest searchRequest = capturedSearchRequests.get(0); + assertThat(capturedSearchRequests.size(), equalTo(3)); + SearchRequest searchRequest = capturedSearchRequests.get(1); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); @@ -218,49 +200,76 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); } + @SuppressWarnings("unchecked") + public void testCalcCutoffEpochMs() throws IOException { + List searchResponses = new ArrayList<>(); + + Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + + givenClientRequests(searchResponses, true, true); + + long retentionDays = 3L; + ActionListener cutoffListener = mock(ActionListener.class); + createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener); + + long dayInMills = 60 * 60 * 24 * 1000; + long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays); + verify(cutoffListener).onResponse(eq(expectedCutoffTime)); + } + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executor = mock(ExecutorService.class); + + when(threadPool.executor(eq(MachineLearning.UTILITY_THREAD_POOL_NAME))).thenReturn(executor); + + doAnswer(invocationOnMock -> { + Runnable run = (Runnable) invocationOnMock.getArguments()[0]; + run.run(); + return null; + } + ).when(executor).execute(any()); return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) { - return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).build(); + return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build(); } - private void givenClientRequestsSucceed(List jobs) throws IOException { - givenClientRequests(jobs, true, true); + private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { + return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build(); } - private void givenClientSearchRequestsFail(List jobs) throws IOException { - givenClientRequests(jobs, false, true); + private void givenClientRequestsSucceed(List searchResponses) { + givenClientRequests(searchResponses, true, true); } - private void givenClientDeleteModelSnapshotRequestsFail(List jobs) throws IOException { - givenClientRequests(jobs, true, false); + private void givenClientSearchRequestsFail(List searchResponses) { + givenClientRequests(searchResponses, false, true); + } + + private void givenClientDeleteModelSnapshotRequestsFail(List searchResponses) { + givenClientRequests(searchResponses, true, false); } @SuppressWarnings("unchecked") - private void givenClientRequests(List jobs, - boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) throws IOException { - SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); + private void givenClientRequests(List searchResponses, + boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) { doAnswer(new Answer() { - int callCount = 0; - AtomicBoolean isJobQuery = new AtomicBoolean(true); + AtomicInteger callCount = new AtomicInteger(); @Override public Void answer(InvocationOnMock invocationOnMock) { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - if (isJobQuery.get()) { - listener.onResponse(response); - isJobQuery.set(false); - return null; - } - SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1]; capturedSearchRequests.add(searchRequest); - if (shouldSearchRequestsSucceed) { - listener.onResponse(searchResponsesPerCall.get(callCount++)); + // Only the last search request should fail + if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) { + listener.onResponse(searchResponses.get(callCount.getAndIncrement())); } else { listener.onFailure(new RuntimeException("search failed")); } @@ -268,9 +277,7 @@ public Void answer(InvocationOnMock invocationOnMock) { } }).when(client).execute(same(SearchAction.INSTANCE), any(), any()); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) { + doAnswer(invocationOnMock -> { capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -281,7 +288,6 @@ public Void answer(InvocationOnMock invocationOnMock) { } return null; } - }).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); + ).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); } - } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index b4c5a051fb8c1..29c8dad0c668c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -7,26 +7,32 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; import org.junit.Before; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.equalTo; @@ -50,13 +56,13 @@ public class ExpiredResultsRemoverTests extends ESTestCase { public void setUpTests() { capturedDeleteByQueryRequests = new ArrayList<>(); - client = org.mockito.Mockito.mock(Client.class); + client = mock(Client.class); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); listener = mock(ActionListener.class); } public void testRemove_GivenNoJobs() throws IOException { - givenClientRequestsSucceed(); + givenDBQRequestsSucceed(); AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList()); createExpiredResultsRemover().remove(listener, () -> false); @@ -66,7 +72,7 @@ public void testRemove_GivenNoJobs() throws IOException { } public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { - givenClientRequestsSucceed(); + givenDBQRequestsSucceed(); AbstractExpiredJobDataRemoverTests.givenJobs(client, Arrays.asList( JobTests.buildJobBuilder("foo").build(), @@ -79,14 +85,14 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); } - public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception { - givenClientRequestsSucceed(); - AbstractExpiredJobDataRemoverTests.givenJobs(client, - Arrays.asList( + public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() { + givenDBQRequestsSucceed(); + + givenSearchResponses(Arrays.asList( JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), - JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() - )); + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), + new Bucket("id_not_important", new Date(), 60)); createExpiredResultsRemover().remove(listener, () -> false); @@ -98,13 +104,12 @@ public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception verify(listener).onResponse(true); } - public void testRemove_GivenTimeout() throws Exception { - givenClientRequestsSucceed(); - AbstractExpiredJobDataRemoverTests.givenJobs(client, - Arrays.asList( - JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), - JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() - )); + public void testRemove_GivenTimeout() { + givenDBQRequestsSucceed(); + givenSearchResponses(Arrays.asList( + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() + ), new Bucket("id_not_important", new Date(), 60)); final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); @@ -115,14 +120,14 @@ public void testRemove_GivenTimeout() throws Exception { verify(listener).onResponse(false); } - public void testRemove_GivenClientRequestsFailed() throws IOException { - givenClientRequestsFailed(); - AbstractExpiredJobDataRemoverTests.givenJobs(client, + public void testRemove_GivenClientRequestsFailed() { + givenDBQRequestsFailed(); + givenSearchResponses( Arrays.asList( - JobTests.buildJobBuilder("none").build(), - JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), - JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() - )); + JobTests.buildJobBuilder("none").build(), + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), + new Bucket("id_not_important", new Date(), 60)); createExpiredResultsRemover().remove(listener, () -> false); @@ -132,19 +137,33 @@ public void testRemove_GivenClientRequestsFailed() throws IOException { verify(listener).onFailure(any()); } - private void givenClientRequestsSucceed() { - givenClientRequests(true); + @SuppressWarnings("unchecked") + public void testCalcCutoffEpochMs() { + String jobId = "calc-cutoff"; + Date latest = new Date(); + + givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()), + new Bucket(jobId, latest, 60)); + + ActionListener cutoffListener = mock(ActionListener.class); + createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener); + + long dayInMills = 60 * 60 * 24 * 1000; + long expectedCutoffTime = latest.getTime() - dayInMills; + verify(cutoffListener).onResponse(eq(expectedCutoffTime)); } - private void givenClientRequestsFailed() { - givenClientRequests(false); + private void givenDBQRequestsSucceed() { + givenDBQRequest(true); + } + + private void givenDBQRequestsFailed() { + givenDBQRequest(false); } @SuppressWarnings("unchecked") - private void givenClientRequests(boolean shouldSucceed) { - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) { + private void givenDBQRequest(boolean shouldSucceed) { + doAnswer(invocationOnMock -> { capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -157,10 +176,38 @@ public Void answer(InvocationOnMock invocationOnMock) { } return null; } - }).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); + ).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); + } + + @SuppressWarnings("unchecked") + private void givenSearchResponses(List jobs, Bucket bucket) { + doAnswer(invocationOnMock -> { + SearchRequest request = (SearchRequest) invocationOnMock.getArguments()[1]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + + if (request.indices()[0].startsWith(AnomalyDetectorsIndex.jobResultsIndexPrefix())) { + // asking for the bucket result + listener.onResponse(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(bucket))); + } else { + listener.onResponse(AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs)); + } + return null; + }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); } private ExpiredResultsRemover createExpiredResultsRemover() { - return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class)); + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executor = mock(ExecutorService.class); + + when(threadPool.executor(eq(MachineLearning.UTILITY_THREAD_POOL_NAME))).thenReturn(executor); + + doAnswer(invocationOnMock -> { + Runnable run = (Runnable) invocationOnMock.getArguments()[0]; + run.run(); + return null; + } + ).when(executor).execute(any()); + + return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool); } }