Skip to content

Commit 3f9a96e

Browse files
committed
[8.0] [ML] Improve cleanup for model snapshot upgrades
If a model snapshot upgrade persistent task is cancelled then we now kill any associated C++ process. Previously the C++ process could hang indefinitely. Additionally, ML feature reset now cancels any in-progress model snapshot upgrades before cleaning up job data, and deleting an anomaly detection job cancels any in-progress model snapshot upgrades associated with that job before cleaning up the job's data. Backport of elastic#81831
1 parent 882728e commit 3f9a96e

File tree

11 files changed

+510
-34
lines changed

11 files changed

+510
-34
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ public SimpleIdsMatcher(String[] tokens) {
202202
.collect(Collectors.toList());
203203
}
204204

205+
public SimpleIdsMatcher(String expression) {
206+
this(tokenizeExpression(expression));
207+
}
208+
205209
/**
206210
* Do any of the matchers match {@code id}?
207211
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.action;
9+
10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.ActionResponse;
13+
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.common.io.stream.Writeable;
18+
import org.elasticsearch.xcontent.ObjectParser;
19+
import org.elasticsearch.xcontent.ParseField;
20+
import org.elasticsearch.xcontent.ToXContentObject;
21+
import org.elasticsearch.xcontent.XContentBuilder;
22+
import org.elasticsearch.xpack.core.ml.job.config.Job;
23+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
24+
25+
import java.io.IOException;
26+
import java.util.Objects;
27+
28+
public class CancelJobModelSnapshotUpgradeAction extends ActionType<CancelJobModelSnapshotUpgradeAction.Response> {
29+
30+
public static final CancelJobModelSnapshotUpgradeAction INSTANCE = new CancelJobModelSnapshotUpgradeAction();
31+
32+
// Even though at the time of writing this action doesn't have a REST endpoint the action name is
33+
// still "admin" rather than "internal". This is because there's no conceptual reason why this
34+
// action couldn't have a REST endpoint in the future, and it's painful to change these action
35+
// names after release. The only difference is that in 7.17 the last remaining transport client
36+
// users will be able to call this endpoint. In 8.x there is no transport client, so in 8.x there
37+
// is no difference between having "admin" and "internal" here in the period before a REST endpoint
38+
// exists. Using "admin" just makes life easier if we ever decide to add a REST endpoint in the
39+
// future.
40+
public static final String NAME = "cluster:admin/xpack/ml/job/model_snapshots/upgrade/cancel";
41+
42+
private CancelJobModelSnapshotUpgradeAction() {
43+
super(NAME, Response::new);
44+
}
45+
46+
public static class Request extends ActionRequest implements ToXContentObject {
47+
48+
public static final String ALL = "_all";
49+
50+
public static final ParseField SNAPSHOT_ID = new ParseField("snapshot_id");
51+
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
52+
53+
static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
54+
55+
static {
56+
PARSER.declareString(Request::setJobId, Job.ID);
57+
PARSER.declareString(Request::setSnapshotId, SNAPSHOT_ID);
58+
PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH);
59+
}
60+
61+
private String jobId = ALL;
62+
private String snapshotId = ALL;
63+
private boolean allowNoMatch = true;
64+
65+
public Request() {}
66+
67+
public Request(String jobId, String snapshotId) {
68+
setJobId(jobId);
69+
setSnapshotId(snapshotId);
70+
}
71+
72+
public Request(StreamInput in) throws IOException {
73+
super(in);
74+
jobId = in.readString();
75+
snapshotId = in.readString();
76+
allowNoMatch = in.readBoolean();
77+
}
78+
79+
public final Request setJobId(String jobId) {
80+
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID);
81+
return this;
82+
}
83+
84+
public String getJobId() {
85+
return jobId;
86+
}
87+
88+
public final Request setSnapshotId(String snapshotId) {
89+
this.snapshotId = ExceptionsHelper.requireNonNull(snapshotId, Job.ID);
90+
return this;
91+
}
92+
93+
public String getSnapshotId() {
94+
return snapshotId;
95+
}
96+
97+
public boolean allowNoMatch() {
98+
return allowNoMatch;
99+
}
100+
101+
public Request setAllowNoMatch(boolean allowNoMatch) {
102+
this.allowNoMatch = allowNoMatch;
103+
return this;
104+
}
105+
106+
@Override
107+
public ActionRequestValidationException validate() {
108+
return null;
109+
}
110+
111+
@Override
112+
public void writeTo(StreamOutput out) throws IOException {
113+
super.writeTo(out);
114+
out.writeString(jobId);
115+
out.writeString(snapshotId);
116+
out.writeBoolean(allowNoMatch);
117+
}
118+
119+
@Override
120+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
121+
return builder.startObject()
122+
.field(Job.ID.getPreferredName(), jobId)
123+
.field(SNAPSHOT_ID.getPreferredName(), snapshotId)
124+
.field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch)
125+
.endObject();
126+
}
127+
128+
@Override
129+
public int hashCode() {
130+
return Objects.hash(jobId, snapshotId, allowNoMatch);
131+
}
132+
133+
@Override
134+
public boolean equals(Object obj) {
135+
if (this == obj) {
136+
return true;
137+
}
138+
if (obj == null || obj.getClass() != getClass()) {
139+
return false;
140+
}
141+
Request other = (Request) obj;
142+
return Objects.equals(jobId, other.jobId) && Objects.equals(snapshotId, other.snapshotId) && allowNoMatch == other.allowNoMatch;
143+
}
144+
145+
@Override
146+
public String toString() {
147+
return Strings.toString(this);
148+
}
149+
}
150+
151+
public static class Response extends ActionResponse implements Writeable, ToXContentObject {
152+
153+
private final boolean cancelled;
154+
155+
public Response(boolean cancelled) {
156+
this.cancelled = cancelled;
157+
}
158+
159+
public Response(StreamInput in) throws IOException {
160+
cancelled = in.readBoolean();
161+
}
162+
163+
@Override
164+
public void writeTo(StreamOutput out) throws IOException {
165+
out.writeBoolean(cancelled);
166+
}
167+
168+
public boolean isCancelled() {
169+
return cancelled;
170+
}
171+
172+
@Override
173+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
174+
builder.startObject();
175+
builder.field("cancelled", cancelled);
176+
builder.endObject();
177+
return builder;
178+
}
179+
180+
@Override
181+
public boolean equals(Object o) {
182+
if (this == o) return true;
183+
if (o == null || getClass() != o.getClass()) return false;
184+
Response response = (Response) o;
185+
return cancelled == response.cancelled;
186+
}
187+
188+
@Override
189+
public int hashCode() {
190+
return Objects.hash(cancelled);
191+
}
192+
}
193+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.action;
9+
10+
import org.elasticsearch.common.io.stream.Writeable;
11+
import org.elasticsearch.test.AbstractSerializingTestCase;
12+
import org.elasticsearch.xcontent.XContentParser;
13+
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Request;
14+
15+
public class CancelJobModelSnapshotUpgradeActionRequestTests extends AbstractSerializingTestCase<Request> {
16+
17+
@Override
18+
protected Request createTestInstance() {
19+
Request request = new Request(randomAlphaOfLengthBetween(5, 20), randomAlphaOfLengthBetween(5, 20));
20+
if (randomBoolean()) {
21+
request.setAllowNoMatch(randomBoolean());
22+
}
23+
return request;
24+
}
25+
26+
@Override
27+
protected boolean supportsUnknownFields() {
28+
return false;
29+
}
30+
31+
@Override
32+
protected Writeable.Reader<Request> instanceReader() {
33+
return Request::new;
34+
}
35+
36+
@Override
37+
protected Request doParseInstance(XContentParser parser) {
38+
return Request.PARSER.apply(parser, null);
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.action;
9+
10+
import org.elasticsearch.common.io.stream.Writeable;
11+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
12+
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Response;
13+
14+
public class CancelJobModelSnapshotUpgradeActionResponseTests extends AbstractWireSerializingTestCase<Response> {
15+
16+
@Override
17+
protected Response createTestInstance() {
18+
return new Response(randomBoolean());
19+
}
20+
21+
@Override
22+
protected Writeable.Reader<Response> instanceReader() {
23+
return CancelJobModelSnapshotUpgradeAction.Response::new;
24+
}
25+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.elasticsearch.xpack.core.ml.MlMetadata;
9090
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
9191
import org.elasticsearch.xpack.core.ml.MlTasks;
92+
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction;
9293
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
9394
import org.elasticsearch.xpack.core.ml.action.CreateTrainedModelAllocationAction;
9495
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
@@ -183,6 +184,7 @@
183184
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState;
184185
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
185186
import org.elasticsearch.xpack.core.template.TemplateUtils;
187+
import org.elasticsearch.xpack.ml.action.TransportCancelJobModelSnapshotUpgradeAction;
186188
import org.elasticsearch.xpack.ml.action.TransportCloseJobAction;
187189
import org.elasticsearch.xpack.ml.action.TransportCreateTrainedModelAllocationAction;
188190
import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarAction;
@@ -1281,6 +1283,7 @@ public List<RestHandler> getRestHandlers(
12811283
new ActionHandler<>(GetTrainedModelsStatsAction.INSTANCE, TransportGetTrainedModelsStatsAction.class),
12821284
new ActionHandler<>(PutTrainedModelAction.INSTANCE, TransportPutTrainedModelAction.class),
12831285
new ActionHandler<>(UpgradeJobModelSnapshotAction.INSTANCE, TransportUpgradeJobModelSnapshotAction.class),
1286+
new ActionHandler<>(CancelJobModelSnapshotUpgradeAction.INSTANCE, TransportCancelJobModelSnapshotUpgradeAction.class),
12841287
new ActionHandler<>(GetJobModelSnapshotsUpgradeStatsAction.INSTANCE, TransportGetJobModelSnapshotsUpgradeStatsAction.class),
12851288
new ActionHandler<>(PutTrainedModelAliasAction.INSTANCE, TransportPutTrainedModelAliasAction.class),
12861289
new ActionHandler<>(DeleteTrainedModelAliasAction.INSTANCE, TransportDeleteTrainedModelAliasAction.class),
@@ -1763,16 +1766,28 @@ public void cleanUpFeature(
17631766
}, unsetResetModeListener::onFailure);
17641767

17651768
// Stop data feeds
1769+
ActionListener<CancelJobModelSnapshotUpgradeAction.Response> cancelSnapshotUpgradesListener = ActionListener.wrap(
1770+
cancelUpgradesResponse -> {
1771+
StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true);
1772+
client.execute(
1773+
StopDatafeedAction.INSTANCE,
1774+
stopDatafeedsReq,
1775+
ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> {
1776+
logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure);
1777+
client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped);
1778+
})
1779+
);
1780+
},
1781+
unsetResetModeListener::onFailure
1782+
);
1783+
1784+
// Cancel model snapshot upgrades
17661785
ActionListener<AcknowledgedResponse> stopDeploymentsListener = ActionListener.wrap(acknowledgedResponse -> {
1767-
StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true);
1768-
client.execute(
1769-
StopDatafeedAction.INSTANCE,
1770-
stopDatafeedsReq,
1771-
ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> {
1772-
logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure);
1773-
client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped);
1774-
})
1786+
CancelJobModelSnapshotUpgradeAction.Request cancelSnapshotUpgradesReq = new CancelJobModelSnapshotUpgradeAction.Request(
1787+
"_all",
1788+
"_all"
17751789
);
1790+
client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, cancelSnapshotUpgradesListener);
17761791
}, unsetResetModeListener::onFailure);
17771792

17781793
// Stop all model deployments

0 commit comments

Comments
 (0)