From e985a173e63788109ae1bf6603da669f52d3eed4 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 26 May 2020 09:23:41 +0100 Subject: [PATCH] Fix delete_expired_data/nightly maintenance when many model snapshots need deleting (#57041) The queries performed by the expired data removers pull back entire documents when only a few fields are required. For ModelSnapshots in particular this is a problem as they contain quantiles which may be 100s of KB and the search size is set to 10,000. This change makes the search more efficient by only requesting the fields needed to work out which expired data should be deleted. # Conflicts: # x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java # x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java # x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java # x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java --- .../xpack/core/common/time/TimeUtils.java | 30 ++++ .../core/common/time/TimeUtilsTests.java | 6 + .../xpack/ml/extractor/TimeField.java | 19 +-- .../retention/ExpiredForecastsRemover.java | 73 +++++---- .../ExpiredModelSnapshotsRemover.java | 141 +++++++++++------- .../xpack/ml/job/retention/MlDataRemover.java | 20 +++ .../AbstractExpiredJobDataRemoverTests.java | 8 + .../ExpiredModelSnapshotsRemoverTests.java | 66 +++++--- .../ml/job/retention/MlDataRemoverTests.java | 30 ++++ 9 files changed, 281 insertions(+), 112 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java index 01667f8a48160..aee26a018304e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java @@ -46,6 +46,36 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]"); } + /** + * Safely parses a string epoch representation to a Long + * + * Commonly this function is used for parsing Date fields from doc values + * requested with the format "epoch_millis". + * + * Since nanosecond support was added epoch_millis timestamps may have a fractional component. + * We discard this, taking just whole milliseconds. Arguably it would be better to retain the + * precision here and let the downstream component decide whether it wants the accuracy, but + * that makes it hard to pass around the value as a number. The double type doesn't have + * enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would + * work, but that isn't supported by the JSON parser if the number gets round-tripped through + * JSON. So String is really the only format that could be used, but the consumers of time + * are expecting a number. + * + * @param epoch The epoch value as a string. This may contain a fractional component. + * @return The epoch value. + */ + public static long parseToEpochMs(String epoch) { + int dotPos = epoch.indexOf('.'); + if (dotPos == -1) { + return Long.parseLong(epoch); + } else if (dotPos > 0) { + return Long.parseLong(epoch.substring(0, dotPos)); + } else { + // The first character is '.' so round down to 0 + return 0L; + } + } + /** * First tries to parse the date first as a Long and convert that to an * epoch time. If the long number has more than 10 digits it is considered a diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java index e122202b5fa6c..0dcb245c78006 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java @@ -72,6 +72,12 @@ public void testDateStringToEpoch() { assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500")); } + public void testParseToEpochMs() { + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000")); + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005")); + assertEquals(0L, TimeUtils.parseToEpochMs(".005")); + } + public void testCheckMultiple_GivenMultiples() { TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo")); TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java index 24412fe6eb77c..9436dddde78db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java @@ -7,6 +7,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import java.util.Collections; import java.util.Objects; @@ -44,23 +45,7 @@ public Object[] value(SearchHit hit) { return value; } if (value[0] instanceof String) { // doc_value field with the epoch_millis format - // Since nanosecond support was added epoch_millis timestamps may have a fractional component. - // We discard this, taking just whole milliseconds. Arguably it would be better to retain the - // precision here and let the downstream component decide whether it wants the accuracy, but - // that makes it hard to pass around the value as a number. The double type doesn't have - // enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would - // work, but that isn't supported by the JSON parser if the number gets round-tripped through - // JSON. So String is really the only format that could be used, but the ML consumers of time - // are expecting a number. - String strVal0 = (String) value[0]; - int dotPos = strVal0.indexOf('.'); - if (dotPos == -1) { - value[0] = Long.parseLong(strVal0); - } else if (dotPos > 0) { - value[0] = Long.parseLong(strVal0.substring(0, dotPos)); - } else { - value[0] = 0L; - } + value[0] = TimeUtils.parseToEpochMs((String)value[0]); } else if (value[0] instanceof Long == false) { // pre-6.0 field throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 40611438fda59..301a77be25660 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -14,11 +14,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -30,6 +25,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -38,8 +34,6 @@ import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.MachineLearning; -import java.io.IOException; -import java.io.InputStream; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; @@ -85,6 +79,11 @@ public void remove(ActionListener listener, Supplier isTimedOu .filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName()))); source.size(MAX_FORECASTS); source.trackTotalHits(true); + source.fetchSource(false); + source.docValueField(Job.ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis"); + // _doc is the most efficient sort order and will also disable scoring source.sort(ElasticsearchMappings.ES_DOC); @@ -96,11 +95,9 @@ public void remove(ActionListener listener, Supplier isTimedOu } private void deleteForecasts(SearchResponse searchResponse, ActionListener listener, Supplier isTimedOutSupplier) { - List forecastsToDelete; - try { - forecastsToDelete = findForecastsToDelete(searchResponse); - } catch (IOException e) { - listener.onFailure(e); + List forecastsToDelete = findForecastsToDelete(searchResponse); + if (forecastsToDelete.isEmpty()) { + listener.onResponse(true); return; } @@ -131,8 +128,8 @@ public void onFailure(Exception e) { }); } - private List findForecastsToDelete(SearchResponse searchResponse) throws IOException { - List forecastsToDelete = new ArrayList<>(); + private List findForecastsToDelete(SearchResponse searchResponse) { + List forecastsToDelete = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); if (hits.getTotalHits().value > MAX_FORECASTS) { @@ -140,19 +137,29 @@ private List findForecastsToDelete(SearchResponse searchRe } for (SearchHit hit : hits.getHits()) { - try (InputStream stream = hit.getSourceRef().streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null); - if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { - forecastsToDelete.add(forecastRequestStats); + String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + if (expiryTime == null) { + LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + continue; + } + long expiryMs = TimeUtils.parseToEpochMs(expiryTime); + if (expiryMs < cutoffEpochMs) { + JobForecastId idPair = new JobForecastId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + forecastsToDelete.add(idPair); } + } + } return forecastsToDelete; } - private DeleteByQueryRequest buildDeleteByQuery(List forecastsToDelete) { + private DeleteByQueryRequest buildDeleteByQuery(List ids) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); @@ -160,10 +167,12 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)); - for (ForecastRequestStats forecastToDelete : forecastsToDelete) { - boolQuery.should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId())) - .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId()))); + for (JobForecastId jobForecastId : ids) { + if (jobForecastId.hasNullValue() == false) { + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId)) + .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId))); + } } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); request.setQuery(query); @@ -173,4 +182,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec return request; } + + private static class JobForecastId { + private final String jobId; + private final String forecastId; + + private JobForecastId(String jobId, String forecastId) { + this.jobId = jobId; + this.forecastId = forecastId; + } + + boolean hasNullValue() { + return jobId == null || forecastId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 593abd2273d2c..11401d580c8fb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -24,10 +24,10 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; @@ -45,7 +45,7 @@ * of their respective job with the exception of the currently used snapshot. * A snapshot is deleted if its timestamp is earlier than the start of the * current day (local time-zone) minus the retention period. - * + *

* This is expected to be used by actions requiring admin rights. Thus, * it is also expected that the provided client will be a client with the * ML origin so that permissions to manage ML indices are met. @@ -55,9 +55,9 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private static final Logger LOGGER = LogManager.getLogger(ExpiredModelSnapshotsRemover.class); /** - * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as - * we don't change that in our ML indices. It should be more than enough for most cases. If not, - * it will take a few iterations to delete all snapshots, which is OK. + * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as + * we don't change that in our ML indices. It should be more than enough for most cases. If not, + * it will take a few iterations to delete all snapshots, which is OK. */ private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000; @@ -78,31 +78,34 @@ Long getRetentionDays(Job job) { @Override void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); + MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false); latestSnapshotTimeStamp(jobId, ActionListener.wrap( - latestTime -> { - if (latestTime == null) { - threadedActionListener.onResponse(null); - } else { - long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); - threadedActionListener.onResponse(cutoff); - } - }, - listener::onFailure + latestTime -> { + if (latestTime == null) { + threadedActionListener.onResponse(null); + } else { + long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + threadedActionListener.onResponse(cutoff); + } + }, + listener::onFailure )); } private void latestSnapshotTimeStamp(String jobId, ActionListener listener) { SortBuilder sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); QueryBuilder snapshotQuery = QueryBuilders.boolQuery() - .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())); + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) + .filter(QueryBuilders.existsQuery(ModelSnapshot.TIMESTAMP.getPreferredName())); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(sortBuilder); searchSourceBuilder.query(snapshotQuery); searchSourceBuilder.size(1); searchSourceBuilder.trackTotalHits(false); + searchSourceBuilder.fetchSource(false); + searchSourceBuilder.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); SearchRequest searchRequest = new SearchRequest(indexName); @@ -110,17 +113,23 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener listener searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); client.search(searchRequest, ActionListener.wrap( - response -> { - SearchHit[] hits = response.getHits().getHits(); - if (hits.length == 0) { - // no snapshots found + response -> { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length == 0) { + // no snapshots found + listener.onResponse(null); + } else { + String timestamp = stringFieldValueOrNull(hits[0], ModelSnapshot.TIMESTAMP.getPreferredName()); + if (timestamp == null) { + LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hits[0].getId()); listener.onResponse(null); } else { - ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef()); - listener.onResponse(snapshot.getTimestamp().getTime()); + long timestampMs = TimeUtils.parseToEpochMs(timestamp); + listener.onResponse(timestampMs); } - }, - listener::onFailure) + } + }, + listener::onFailure) ); } @@ -137,17 +146,25 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener(LOGGER, threadPool, - MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); + MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); } private ActionListener expiredSnapshotsListener(String jobId, ActionListener listener) { @@ -155,11 +172,17 @@ private ActionListener expiredSnapshotsListener(String jobId, Ac @Override public void onResponse(SearchResponse searchResponse) { try { - List modelSnapshots = new ArrayList<>(); + List snapshotIds = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { - modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); + JobSnapshotId idPair = new JobSnapshotId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + snapshotIds.add(idPair); + } } - deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); + deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener); } catch (Exception e) { onFailure(e); } @@ -167,34 +190,48 @@ public void onResponse(SearchResponse searchResponse) { @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + jobId + "] Search for expired snapshots failed", e)); + listener.onFailure(new ElasticsearchException("[" + jobId + "] Search for expired snapshots failed", e)); } }; } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { + private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { if (modelSnapshotIterator.hasNext() == false) { listener.onResponse(true); return; } - ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request( - modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); + JobSnapshotId idPair = modelSnapshotIterator.next(); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = + new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse response) { - try { - deleteModelSnapshots(modelSnapshotIterator, listener); - } catch (Exception e) { - onFailure(e); - } + @Override + public void onResponse(AcknowledgedResponse response) { + try { + deleteModelSnapshots(modelSnapshotIterator, listener); + } catch (Exception e) { + onFailure(e); } + } - @Override - public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" - + modelSnapshot.getSnapshotId() + "]", e)); - } - }); + @Override + public void onFailure(Exception e) { + listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot [" + + idPair.snapshotId + "]", e)); + } + }); + } + + static class JobSnapshotId { + private final String jobId; + private final String snapshotId; + + JobSnapshotId(String jobId, String snapshotId) { + this.jobId = jobId; + this.snapshotId = snapshotId; + } + + boolean hasNullValue() { + return jobId == null || snapshotId == null; + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 485d8e9bfa22d..0fa06262801f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -6,9 +6,29 @@ package org.elasticsearch.xpack.ml.job.retention; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.SearchHit; import java.util.function.Supplier; public interface MlDataRemover { void remove(ActionListener listener, Supplier isTimedOutSupplier); + + /** + * Extract {@code fieldName} from {@code hit} and if it is a string + * return the string else {@code null}. + * @param hit The search hit + * @param fieldName Field to find + * @return value iff the docfield is present and it is a string. Otherwise {@code null} + */ + default String stringFieldValueOrNull(SearchHit hit, String fieldName) { + DocumentField docField = hit.field(fieldName); + if (docField != null) { + Object value = docField.getValue(); + if (value instanceof String) { + return (String)value; + } + } + return null; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 0d8955c05774f..f9e1b4a2c1d93 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -86,6 +86,14 @@ static SearchResponse createSearchResponse(List toXContent return createSearchResponse(toXContents, toXContents.size()); } + static SearchResponse createSearchResponseFromHits(List hits) { + SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), + new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + return searchResponse; + } + @SuppressWarnings("unchecked") static void givenJobs(Client client, List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index a178cd48b7cad..b34d9b05fe45a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -13,15 +13,19 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -89,17 +93,19 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + Date now = new Date(); + Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis()); - ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted))); + SearchHit snapshotToBeDeleted = createModelSnapshotQueryHit("job-1", "old-snapshot", eightDaysAgo); + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshotToBeDeleted))); - ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1))); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); + SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-1", "snapshots-1_1", eightDaysAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.emptyList())); givenClientRequestsSucceed(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); @@ -173,16 +179,20 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() ))); - ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1"); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + Date now = new Date(); + Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); - List snapshots1JobSnapshots = Arrays.asList( + // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); + List snapshots1JobSnapshots = Arrays.asList( snapshot1_1, - createModelSnapshot("snapshots-1", "snapshots-1_2")); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(snapshots1JobSnapshots)); - ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1"); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); + SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); givenClientDeleteModelSnapshotRequestsFail(searchResponses); createExpiredModelSnapshotsRemover().remove(listener, () -> false); @@ -201,12 +211,12 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio } @SuppressWarnings("unchecked") - public void testCalcCutoffEpochMs() throws IOException { + public void testCalcCutoffEpochMs() { List searchResponses = new ArrayList<>(); Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "newest-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); givenClientRequests(searchResponses, true, true); @@ -219,6 +229,17 @@ public void testCalcCutoffEpochMs() throws IOException { verify(cutoffListener).onResponse(eq(expectedCutoffTime)); } + public void testJobSnapshotId() { + ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b"); + assertFalse(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b"); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null); + assertTrue(id.hasNullValue()); + } + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -242,6 +263,15 @@ private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build(); } + private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId, Date date) { + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId)); + hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId)); + String dateAsString = Long.valueOf(date.getTime()).toString(); + hitBuilder.addField(ModelSnapshot.TIMESTAMP.getPreferredName(), Collections.singletonList(dateAsString)); + return hitBuilder.build(); + } + private void givenClientRequestsSucceed(List searchResponses) { givenClientRequests(searchResponses, true, true); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java new file mode 100644 index 0000000000000..5b5638a904a99 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; + +import java.util.Collections; +import java.util.Date; + +public class MlDataRemoverTests extends ESTestCase { + public void testStringOrNull() { + MlDataRemover remover = (listener, isTimedOutSupplier) -> { }; + + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "missing")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("not_a_string", Collections.singletonList(new Date())); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "not_a_string")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("string_field", Collections.singletonList("actual_string_value")); + assertEquals("actual_string_value", remover.stringFieldValueOrNull(hitBuilder.build(), "string_field")); + } +}