Skip to content

Commit 4ab10c5

Browse files
authored
[ML] [Deprecation] add deprecation check for job model snapshots that need upgraded (#66062)
This adds checks that verify that machine learning anomaly job model snapshots support the required minimal version. If any are not the required version, directions are given to either delete the model snapshot, or utilize the _upgrade API. relates: #64154
1 parent e49fd15 commit 4ab10c5

File tree

6 files changed

+211
-12
lines changed

6 files changed

+211
-12
lines changed

x-pack/plugin/deprecation/qa/rest/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ esplugin {
99
}
1010

1111
dependencies {
12+
javaRestTestImplementation project(':client:rest-high-level')
1213
javaRestTestImplementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
1314
javaRestTestImplementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}")
1415
}
@@ -25,6 +26,7 @@ restResources {
2526
testClusters.all {
2627
testDistribution = 'DEFAULT'
2728
setting 'xpack.security.enabled', 'false'
29+
setting 'xpack.license.self_generated.type', 'trial'
2830
}
2931

3032
tasks.named("test").configure { enabled = false }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.deprecation;
8+
9+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.client.RequestOptions;
12+
import org.elasticsearch.client.RestClient;
13+
import org.elasticsearch.client.RestHighLevelClient;
14+
import org.elasticsearch.client.WarningsHandler;
15+
import org.elasticsearch.client.migration.DeprecationInfoRequest;
16+
import org.elasticsearch.client.migration.DeprecationInfoResponse;
17+
import org.elasticsearch.client.ml.PutJobRequest;
18+
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
19+
import org.elasticsearch.client.ml.job.config.DataDescription;
20+
import org.elasticsearch.client.ml.job.config.Detector;
21+
import org.elasticsearch.client.ml.job.config.Job;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
24+
import org.elasticsearch.common.xcontent.XContentType;
25+
import org.elasticsearch.search.SearchModule;
26+
import org.elasticsearch.test.rest.ESRestTestCase;
27+
28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
31+
import static org.hamcrest.Matchers.containsString;
32+
import static org.hamcrest.Matchers.hasSize;
33+
34+
public class MlDeprecationIT extends ESRestTestCase {
35+
36+
private static final RequestOptions REQUEST_OPTIONS = RequestOptions.DEFAULT.toBuilder()
37+
.setWarningsHandler(WarningsHandler.PERMISSIVE)
38+
.build();
39+
40+
private static class HLRC extends RestHighLevelClient {
41+
HLRC(RestClient restClient) {
42+
super(restClient, RestClient::close, new ArrayList<>());
43+
}
44+
}
45+
46+
@Override
47+
protected NamedXContentRegistry xContentRegistry() {
48+
SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
49+
return new NamedXContentRegistry(searchModule.getNamedXContents());
50+
}
51+
52+
@Override
53+
protected boolean enableWarningsCheck() {
54+
return false;
55+
}
56+
57+
public void testMlDeprecationChecks() throws Exception {
58+
HLRC hlrc = new HLRC(client());
59+
String jobId = "deprecation_check_job";
60+
hlrc.machineLearning()
61+
.putJob(
62+
new PutJobRequest(
63+
Job.builder(jobId)
64+
.setAnalysisConfig(
65+
AnalysisConfig.builder(Collections.singletonList(Detector.builder().setFunction("count").build()))
66+
)
67+
.setDataDescription(new DataDescription.Builder().setTimeField("time"))
68+
.build()
69+
),
70+
REQUEST_OPTIONS
71+
);
72+
73+
IndexRequest indexRequest = new IndexRequest(".ml-anomalies-.write-" + jobId).id(jobId + "_model_snapshot_1")
74+
.source("{\"job_id\":\"deprecation_check_job\",\"snapshot_id\":\"1\", \"snapshot_doc_count\":1}", XContentType.JSON);
75+
hlrc.index(indexRequest, REQUEST_OPTIONS);
76+
77+
indexRequest = new IndexRequest(".ml-anomalies-.write-" + jobId).id(jobId + "_model_snapshot_2")
78+
.source(
79+
"{\"job_id\":\"deprecation_check_job\",\"snapshot_id\":\"2\",\"snapshot_doc_count\":1,\"min_version\":\"8.0.0\"}",
80+
XContentType.JSON
81+
);
82+
hlrc.index(indexRequest, REQUEST_OPTIONS);
83+
hlrc.indices().refresh(new RefreshRequest(".ml-anomalies-*"), REQUEST_OPTIONS);
84+
85+
DeprecationInfoResponse response = hlrc.migration()
86+
.getDeprecationInfo(
87+
// specify an index so that deprecation checks don't run against any accidentally existing indices
88+
new DeprecationInfoRequest(Collections.singletonList("index-that-does-not-exist-*")),
89+
RequestOptions.DEFAULT
90+
);
91+
assertThat(response.getMlSettingsIssues(), hasSize(1));
92+
assertThat(
93+
response.getMlSettingsIssues().get(0).getMessage(),
94+
containsString("model snapshot [1] for job [deprecation_check_job] needs to be deleted or upgraded")
95+
);
96+
hlrc.close();
97+
}
98+
99+
}

x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/MlDeprecationChecker.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,22 @@
66

77
package org.elasticsearch.xpack.deprecation;
88

9+
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.common.settings.Settings;
1112
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
1213
import org.elasticsearch.xpack.core.XPackSettings;
14+
import org.elasticsearch.xpack.core.action.util.PageParams;
1315
import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
1416
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
17+
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
1518
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
19+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
1620

1721
import java.util.ArrayList;
1822
import java.util.Collections;
1923
import java.util.List;
24+
import java.util.Locale;
2025
import java.util.Optional;
2126

2227
public class MlDeprecationChecker implements DeprecationChecker {
@@ -45,6 +50,29 @@ static Optional<DeprecationIssue> checkDataFeedAggregations(DatafeedConfig dataf
4550
}
4651
}
4752

53+
static Optional<DeprecationIssue> checkModelSnapshot(ModelSnapshot modelSnapshot) {
54+
if (modelSnapshot.getMinVersion().before(Version.V_7_0_0)) {
55+
return Optional.of(new DeprecationIssue(DeprecationIssue.Level.CRITICAL,
56+
String.format(
57+
Locale.ROOT,
58+
"model snapshot [%s] for job [%s] needs to be deleted or upgraded",
59+
modelSnapshot.getSnapshotId(),
60+
modelSnapshot.getJobId()
61+
),
62+
"https://www.elastic.co/guide/en/elasticsearch/reference/master/ml-upgrade-job-model-snapshot.html",
63+
String.format(
64+
Locale.ROOT,
65+
"model snapshot [%s] for job [%s] supports minimum version [%s] and needs to be at least [%s]",
66+
modelSnapshot.getSnapshotId(),
67+
modelSnapshot.getJobId(),
68+
modelSnapshot.getMinVersion(),
69+
Version.V_7_0_0
70+
)
71+
));
72+
}
73+
return Optional.empty();
74+
}
75+
4876
@Override
4977
public boolean enabled(Settings settings) {
5078
return XPackSettings.MACHINE_LEARNING_ENABLED.get(settings);
@@ -56,16 +84,36 @@ public void check(Components components, ActionListener<CheckResult> deprecation
5684
deprecationIssueListener.onResponse(new CheckResult(getName(), Collections.emptyList()));
5785
return;
5886
}
87+
List<DeprecationIssue> issues = Collections.synchronizedList(new ArrayList<>());
88+
final GetModelSnapshotsAction.Request getModelSnapshots = new GetModelSnapshotsAction.Request("*", null);
89+
getModelSnapshots.setPageParams(new PageParams(0, 50));
90+
getModelSnapshots.setSort(ModelSnapshot.MIN_VERSION.getPreferredName());
91+
92+
ActionListener<Void> getModelSnaphots = ActionListener.wrap(
93+
_unused -> components.client().execute(
94+
GetModelSnapshotsAction.INSTANCE,
95+
getModelSnapshots,
96+
ActionListener.wrap(
97+
modelSnapshots -> {
98+
modelSnapshots.getResources()
99+
.results()
100+
.forEach(modelSnapshot -> checkModelSnapshot(modelSnapshot)
101+
.ifPresent(issues::add));
102+
deprecationIssueListener.onResponse(new CheckResult(getName(), issues));
103+
},
104+
deprecationIssueListener::onFailure)
105+
),
106+
deprecationIssueListener::onFailure);
107+
59108
components.client().execute(
60109
GetDatafeedsAction.INSTANCE,
61110
new GetDatafeedsAction.Request(GetDatafeedsAction.ALL), ActionListener.wrap(
62111
datafeedsResponse -> {
63-
List<DeprecationIssue> issues = new ArrayList<>();
64112
for (DatafeedConfig df : datafeedsResponse.getResponse().results()) {
65113
checkDataFeedAggregations(df, components.xContentRegistry()).ifPresent(issues::add);
66114
checkDataFeedQuery(df, components.xContentRegistry()).ifPresent(issues::add);
67115
}
68-
deprecationIssueListener.onResponse(new CheckResult(getName(), issues));
116+
getModelSnaphots.onResponse(null);
69117
},
70118
deprecationIssueListener::onFailure
71119
)

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.common.xcontent.ToXContent;
3838
import org.elasticsearch.common.xcontent.XContentBuilder;
3939
import org.elasticsearch.common.xcontent.XContentFactory;
40+
import org.elasticsearch.common.xcontent.XContentType;
4041
import org.elasticsearch.threadpool.ThreadPool;
4142
import org.elasticsearch.xpack.core.ClientHelper;
4243
import org.elasticsearch.xpack.core.action.util.PageParams;
@@ -647,16 +648,30 @@ public void testGetSnapshots() {
647648
Job.Builder job = createJob(jobId);
648649
indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("snap_2")
649650
.setTimestamp(Date.from(Instant.ofEpochMilli(10)))
651+
.setMinVersion(Version.V_7_4_0)
650652
.build());
651653
indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("snap_1")
652654
.setTimestamp(Date.from(Instant.ofEpochMilli(11)))
655+
.setMinVersion(Version.V_7_2_0)
653656
.build());
654657
indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("other_snap")
655658
.setTimestamp(Date.from(Instant.ofEpochMilli(12)))
659+
.setMinVersion(Version.V_7_3_0)
656660
.build());
661+
createJob("other_job");
662+
indexModelSnapshot(new ModelSnapshot.Builder("other_job").setSnapshotId("other_snap")
663+
.setTimestamp(Date.from(Instant.ofEpochMilli(10)))
664+
.setMinVersion(Version.CURRENT)
665+
.build());
666+
// Add a snapshot WITHOUT a min version.
667+
client().prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName("other_job"))
668+
.setId(ModelSnapshot.documentId("other_job", "11"))
669+
.setSource("{\"job_id\":\"other_job\"," +
670+
"\"snapshot_id\":\"11\", \"snapshot_doc_count\":1,\"retain\":false}", XContentType.JSON)
671+
.get();
657672

658673
client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobStateIndexPattern(),
659-
AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).get();
674+
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*").get();
660675

661676
PlainActionFuture<QueryPage<ModelSnapshot>> future = new PlainActionFuture<>();
662677
jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_2,snap_1", future::onResponse, future::onFailure);
@@ -683,6 +698,24 @@ public void testGetSnapshots() {
683698
assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2"));
684699
assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1"));
685700
assertThat(snapshots.get(2).getSnapshotId(), equalTo("other_snap"));
701+
702+
future = new PlainActionFuture<>();
703+
jobProvider.modelSnapshots("*",
704+
0,
705+
5,
706+
null,
707+
null,
708+
"min_version",
709+
false,
710+
null,
711+
future::onResponse,
712+
future::onFailure);
713+
snapshots = future.actionGet().results();
714+
assertThat(snapshots.get(0).getSnapshotId(), equalTo("11"));
715+
assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1"));
716+
assertThat(snapshots.get(2).getSnapshotId(), equalTo("other_snap"));
717+
assertThat(snapshots.get(3).getSnapshotId(), equalTo("snap_2"));
718+
assertThat(snapshots.get(4).getSnapshotId(), equalTo("other_snap"));
686719
}
687720

688721
public void testGetAutodetectParams() throws Exception {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.support.ActionFilters;
1313
import org.elasticsearch.action.support.HandledTransportAction;
14+
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.common.inject.Inject;
1516
import org.elasticsearch.tasks.Task;
1617
import org.elasticsearch.transport.TransportService;
@@ -53,19 +54,29 @@ protected void doExecute(Task task, GetModelSnapshotsAction.Request request,
5354
request.getSort(),
5455
request.getDescOrder()));
5556

57+
if (Strings.isAllOrWildcard(request.getJobId())) {
58+
getModelSnapshots(request, listener);
59+
return;
60+
}
5661
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
57-
ok -> {
58-
jobResultsProvider.modelSnapshots(request.getJobId(), request.getPageParams().getFrom(),
59-
request.getPageParams().getSize(), request.getStart(), request.getEnd(), request.getSort(),
60-
request.getDescOrder(), request.getSnapshotId(),
61-
page -> {
62-
listener.onResponse(new GetModelSnapshotsAction.Response(clearQuantiles(page)));
63-
}, listener::onFailure);
64-
},
65-
listener::onFailure
62+
ok -> getModelSnapshots(request, listener),
63+
listener::onFailure
6664
));
6765
}
6866

67+
private void getModelSnapshots(GetModelSnapshotsAction.Request request, ActionListener<GetModelSnapshotsAction.Response> listener) {
68+
jobResultsProvider.modelSnapshots(request.getJobId(),
69+
request.getPageParams().getFrom(),
70+
request.getPageParams().getSize(),
71+
request.getStart(),
72+
request.getEnd(),
73+
request.getSort(),
74+
request.getDescOrder(),
75+
request.getSnapshotId(),
76+
page -> listener.onResponse(new GetModelSnapshotsAction.Response(clearQuantiles(page))),
77+
listener::onFailure);
78+
}
79+
6980
public static QueryPage<ModelSnapshot> clearQuantiles(QueryPage<ModelSnapshot> page) {
7081
if (page.results() == null) {
7182
return page;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.ElasticsearchStatusException;
1313
import org.elasticsearch.ResourceAlreadyExistsException;
1414
import org.elasticsearch.ResourceNotFoundException;
15+
import org.elasticsearch.Version;
1516
import org.elasticsearch.action.ActionListener;
1617
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1718
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -1062,6 +1063,11 @@ private void modelSnapshots(String jobId,
10621063

10631064
FieldSortBuilder sb = new FieldSortBuilder(sortField)
10641065
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
1066+
// `min_version` might not be present in very early snapshots.
1067+
// Consequently, we should treat it as being at least from 6.3.0 or before
1068+
if (sortField.equals(ModelSnapshot.MIN_VERSION.getPreferredName())) {
1069+
sb.missing(Version.fromString("6.3.0"));
1070+
}
10651071

10661072
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
10671073
LOGGER.trace("ES API CALL: search all model snapshots from index {} sort ascending {} with filter after sort from {} size {}",

0 commit comments

Comments
 (0)