Skip to content

Commit 7978f0b

Browse files
authored
[ML] Calculate results and snapshot retention using latest bucket timestamps (#51061)
The retention period is calculated relative to the last bucket result or snapshot time rather than wall clock
1 parent 8391b0a commit 7978f0b

File tree

9 files changed

+382
-186
lines changed

9 files changed

+382
-186
lines changed

docs/reference/ml/ml-shared.asciidoc

+10-7
Original file line numberDiff line numberDiff line change
@@ -863,9 +863,10 @@ example, `1575402236000 `.
863863
end::model-snapshot-id[]
864864

865865
tag::model-snapshot-retention-days[]
866-
The time in days that model snapshots are retained for the job. Older snapshots
867-
are deleted. The default value is `1`, which means snapshots are retained for
868-
one day (twenty-four hours).
866+
Advanced configuration option. The period of time (in days) that model snapshots are retained.
867+
Age is calculated relative to the timestamp of the newest model snapshot.
868+
The default value is `1`, which means snapshots that are one day (twenty-four hours)
869+
older than the newest snapshot are deleted.
869870
end::model-snapshot-retention-days[]
870871

871872
tag::multivariate-by-fields[]
@@ -963,10 +964,12 @@ is `shared`, which generates an index named `.ml-anomalies-shared`.
963964
end::results-index-name[]
964965

965966
tag::results-retention-days[]
966-
Advanced configuration option. The number of days for which job results are
967-
retained. Once per day at 00:30 (server time), results older than this period
968-
are deleted from {es}. The default value is null, which means results are
969-
retained.
967+
Advanced configuration option. The period of time (in days) that results are retained.
968+
Age is calculated relative to the timestamp of the latest bucket result.
969+
If this property has a non-null value, once per day at 00:30 (server time),
970+
results that are the specified number of days older than the latest
971+
bucket result are deleted from {es}. The default value is null, which means all
972+
results are retained.
970973
end::results-retention-days[]
971974

972975
tag::retain[]

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

+8-10
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,8 @@
3333
import org.junit.After;
3434
import org.junit.Before;
3535

36-
import java.io.IOException;
3736
import java.nio.charset.StandardCharsets;
3837
import java.util.ArrayList;
39-
import java.util.Arrays;
4038
import java.util.Collections;
4139
import java.util.List;
4240
import java.util.concurrent.TimeUnit;
@@ -52,20 +50,20 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
5250
private static final String DATA_INDEX = "delete-expired-data-test-data";
5351

5452
@Before
55-
public void setUpData() throws IOException {
53+
public void setUpData() {
5654
client().admin().indices().prepareCreate(DATA_INDEX)
5755
.setMapping("time", "type=date,format=epoch_millis")
5856
.get();
5957

60-
// We are going to create data for last 2 days
61-
long nowMillis = System.currentTimeMillis();
58+
// We are going to create 3 days of data ending 1 hr ago
59+
long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis();
6260
int totalBuckets = 3 * 24;
6361
int normalRate = 10;
6462
int anomalousRate = 100;
6563
int anomalousBucket = 30;
6664
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
6765
for (int bucket = 0; bucket < totalBuckets; bucket++) {
68-
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
66+
long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
6967
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
7068
for (int point = 0; point < bucketRate; point++) {
7169
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
@@ -120,7 +118,7 @@ public void testDeleteExpiredData() throws Exception {
120118

121119
String datafeedId = job.getId() + "-feed";
122120
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId());
123-
datafeedConfig.setIndices(Arrays.asList(DATA_INDEX));
121+
datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX));
124122
DatafeedConfig datafeed = datafeedConfig.build();
125123
registerDatafeed(datafeed);
126124
putDatafeed(datafeed);
@@ -208,7 +206,7 @@ public void testDeleteExpiredData() throws Exception {
208206
assertThat(getModelSnapshots("no-retention").size(), equalTo(2));
209207

210208
List<Bucket> buckets = getBuckets("results-retention");
211-
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
209+
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
212210
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
213211
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
214212
assertThat(getRecords("results-retention").size(), equalTo(0));
@@ -223,7 +221,7 @@ public void testDeleteExpiredData() throws Exception {
223221
assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2));
224222

225223
buckets = getBuckets("results-and-snapshots-retention");
226-
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
224+
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
227225
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
228226
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
229227
assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
@@ -276,7 +274,7 @@ public void testDeleteExpiredData() throws Exception {
276274
private static Job.Builder newJobBuilder(String id) {
277275
Detector.Builder detector = new Detector.Builder();
278276
detector.setFunction("count");
279-
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
277+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
280278
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
281279
DataDescription.Builder dataDescription = new DataDescription.Builder();
282280
dataDescription.setTimeField("time");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
8181
Supplier<Boolean> isTimedOutSupplier) {
8282
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
8383
List<MlDataRemover> dataRemovers = Arrays.asList(
84-
new ExpiredResultsRemover(client, auditor),
84+
new ExpiredResultsRemover(client, auditor, threadPool),
8585
new ExpiredForecastsRemover(client, threadPool),
8686
new ExpiredModelSnapshotsRemover(client, threadPool),
8787
new UnusedStateRemover(client, clusterService)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java

+17-14
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.client.OriginSettingClient;
10-
import org.elasticsearch.common.unit.TimeValue;
1110
import org.elasticsearch.index.query.BoolQueryBuilder;
1211
import org.elasticsearch.index.query.QueryBuilders;
1312
import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -16,12 +15,9 @@
1615
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
1716
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
1817

19-
import java.time.Clock;
20-
import java.time.Instant;
2118
import java.util.Deque;
2219
import java.util.Iterator;
2320
import java.util.List;
24-
import java.util.concurrent.TimeUnit;
2521
import java.util.function.Supplier;
2622
import java.util.stream.Collectors;
2723

@@ -68,30 +64,37 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
6864
removeData(jobIterator, listener, isTimedOutSupplier);
6965
return;
7066
}
71-
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
72-
removeDataBefore(job, cutoffEpochMs,
73-
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));
67+
68+
calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
69+
cutoffEpochMs -> {
70+
if (cutoffEpochMs == null) {
71+
removeData(jobIterator, listener, isTimedOutSupplier);
72+
} else {
73+
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(
74+
response -> removeData(jobIterator, listener, isTimedOutSupplier),
75+
listener::onFailure));
76+
}
77+
},
78+
listener::onFailure
79+
));
7480
}
7581

7682
private WrappedBatchedJobsIterator newJobIterator() {
7783
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
7884
return new WrappedBatchedJobsIterator(jobsIterator);
7985
}
8086

81-
private long calcCutoffEpochMs(long retentionDays) {
82-
long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
83-
return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
84-
}
87+
abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener);
8588

86-
protected abstract Long getRetentionDays(Job job);
89+
abstract Long getRetentionDays(Job job);
8790

8891
/**
8992
* Template method to allow implementation details of various types of data (e.g. results, model snapshots).
9093
* Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
9194
*/
92-
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
95+
abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
9396

94-
protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
97+
static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
9598
return QueryBuilders.boolQuery()
9699
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
97100
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

+57-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@
1515
import org.elasticsearch.action.support.ThreadedActionListener;
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.client.OriginSettingClient;
18+
import org.elasticsearch.common.unit.TimeValue;
1819
import org.elasticsearch.index.query.QueryBuilder;
1920
import org.elasticsearch.index.query.QueryBuilders;
2021
import org.elasticsearch.search.SearchHit;
2122
import org.elasticsearch.search.builder.SearchSourceBuilder;
23+
import org.elasticsearch.search.sort.FieldSortBuilder;
24+
import org.elasticsearch.search.sort.SortBuilder;
25+
import org.elasticsearch.search.sort.SortOrder;
2226
import org.elasticsearch.threadpool.ThreadPool;
2327
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
2428
import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -27,12 +31,14 @@
2731
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
2832
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
2933
import org.elasticsearch.xpack.ml.MachineLearning;
34+
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
3035
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
3136

3237
import java.util.ArrayList;
3338
import java.util.Iterator;
3439
import java.util.List;
3540
import java.util.Objects;
41+
import java.util.concurrent.TimeUnit;
3642

3743
/**
3844
* Deletes all model snapshots that have expired the configured retention time
@@ -65,10 +71,59 @@ public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threa
6571
}
6672

6773
@Override
68-
protected Long getRetentionDays(Job job) {
74+
Long getRetentionDays(Job job) {
6975
return job.getModelSnapshotRetentionDays();
7076
}
7177

78+
@Override
79+
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
80+
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
81+
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
82+
83+
latestSnapshotTimeStamp(jobId, ActionListener.wrap(
84+
latestTime -> {
85+
if (latestTime == null) {
86+
threadedActionListener.onResponse(null);
87+
} else {
88+
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
89+
threadedActionListener.onResponse(cutoff);
90+
}
91+
},
92+
listener::onFailure
93+
));
94+
}
95+
96+
private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
97+
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
98+
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
99+
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));
100+
101+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
102+
searchSourceBuilder.sort(sortBuilder);
103+
searchSourceBuilder.query(snapshotQuery);
104+
searchSourceBuilder.size(1);
105+
searchSourceBuilder.trackTotalHits(false);
106+
107+
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
108+
SearchRequest searchRequest = new SearchRequest(indexName);
109+
searchRequest.source(searchSourceBuilder);
110+
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
111+
112+
client.search(searchRequest, ActionListener.wrap(
113+
response -> {
114+
SearchHit[] hits = response.getHits().getHits();
115+
if (hits.length == 0) {
116+
// no snapshots found
117+
listener.onResponse(null);
118+
} else {
119+
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
120+
listener.onResponse(snapshot.getTimestamp().getTime());
121+
}
122+
},
123+
listener::onFailure)
124+
);
125+
}
126+
72127
@Override
73128
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
74129
if (job.getModelSnapshotId() == null) {
@@ -96,7 +151,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
96151
}
97152

98153
private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, ActionListener<Boolean> listener) {
99-
return new ActionListener<SearchResponse>() {
154+
return new ActionListener<>() {
100155
@Override
101156
public void onResponse(SearchResponse searchResponse) {
102157
try {

0 commit comments

Comments
 (0)