Skip to content

Commit f780c1e

Browse files
committed
[ML] complete machine learning plugin feature state clean up integration (elastic#71011)
This completes the machine learning feature state cleanup integration. This commit handles waiting for machine learning tasks to complete and adds a new field to the ML Metadata cluster state to indicate when a reset is in progress for machine learning. relates: elastic#70008
1 parent 29d10ac commit f780c1e

File tree

21 files changed

+742
-102
lines changed

21 files changed

+742
-102
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,11 @@ private XContentBuilder toXContentBuilder(ToXContent toXContent) {
184184
}
185185
}
186186

187-
private void writeBacklog() {
187+
protected void clearBacklog() {
188+
backlog = null;
189+
}
190+
191+
protected void writeBacklog() {
188192
assert backlog != null;
189193
if (backlog == null) {
190194
logger.error("Message back log has already been written");

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

+50-8
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,14 @@ public class MlMetadata implements XPackPlugin.XPackMetadataCustom {
5151
private static final ParseField JOBS_FIELD = new ParseField("jobs");
5252
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
5353
public static final ParseField UPGRADE_MODE = new ParseField("upgrade_mode");
54-
55-
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), false);
54+
public static final ParseField RESET_MODE = new ParseField("reset_mode");
55+
56+
public static final MlMetadata EMPTY_METADATA = new MlMetadata(
57+
Collections.emptySortedMap(),
58+
Collections.emptySortedMap(),
59+
false,
60+
false
61+
);
5662
// This parser follows the pattern that metadata is parsed leniently (to allow for enhancements)
5763
public static final ObjectParser<Builder, Void> LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new);
5864

@@ -61,19 +67,21 @@ public class MlMetadata implements XPackPlugin.XPackMetadataCustom {
6167
LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds,
6268
(p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
6369
LENIENT_PARSER.declareBoolean(Builder::isUpgradeMode, UPGRADE_MODE);
64-
70+
LENIENT_PARSER.declareBoolean(Builder::isResetMode, RESET_MODE);
6571
}
6672

6773
private final SortedMap<String, Job> jobs;
6874
private final SortedMap<String, DatafeedConfig> datafeeds;
6975
private final boolean upgradeMode;
76+
private final boolean resetMode;
7077
private final GroupOrJobLookup groupOrJobLookup;
7178

72-
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, boolean upgradeMode) {
79+
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, boolean upgradeMode, boolean resetMode) {
7380
this.jobs = Collections.unmodifiableSortedMap(jobs);
7481
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
7582
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
7683
this.upgradeMode = upgradeMode;
84+
this.resetMode = resetMode;
7785
}
7886

7987
public Map<String, Job> getJobs() {
@@ -105,6 +113,10 @@ public boolean isUpgradeMode() {
105113
return upgradeMode;
106114
}
107115

116+
public boolean isResetMode() {
117+
return resetMode;
118+
}
119+
108120
@Override
109121
public Version getMinimalSupportedVersion() {
110122
return Version.V_6_0_0_alpha1;
@@ -144,6 +156,11 @@ public MlMetadata(StreamInput in) throws IOException {
144156
} else {
145157
this.upgradeMode = false;
146158
}
159+
if (in.getVersion().onOrAfter(Version.V_7_13_0)) {
160+
this.resetMode = in.readBoolean();
161+
} else {
162+
this.resetMode = false;
163+
}
147164
}
148165

149166
@Override
@@ -153,6 +170,9 @@ public void writeTo(StreamOutput out) throws IOException {
153170
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
154171
out.writeBoolean(upgradeMode);
155172
}
173+
if (out.getVersion().onOrAfter(Version.V_7_13_0)) {
174+
out.writeBoolean(resetMode);
175+
}
156176
}
157177

158178
private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
@@ -170,6 +190,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
170190
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
171191
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
172192
builder.field(UPGRADE_MODE.getPreferredName(), upgradeMode);
193+
builder.field(RESET_MODE.getPreferredName(), resetMode);
173194
return builder;
174195
}
175196

@@ -191,11 +212,13 @@ public static class MlMetadataDiff implements NamedDiff<Metadata.Custom> {
191212
final Diff<Map<String, Job>> jobs;
192213
final Diff<Map<String, DatafeedConfig>> datafeeds;
193214
final boolean upgradeMode;
215+
final boolean resetMode;
194216

195217
MlMetadataDiff(MlMetadata before, MlMetadata after) {
196218
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
197219
this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer());
198220
this.upgradeMode = after.upgradeMode;
221+
this.resetMode = after.resetMode;
199222
}
200223

201224
public MlMetadataDiff(StreamInput in) throws IOException {
@@ -208,6 +231,11 @@ public MlMetadataDiff(StreamInput in) throws IOException {
208231
} else {
209232
upgradeMode = false;
210233
}
234+
if (in.getVersion().onOrAfter(Version.V_7_13_0)) {
235+
resetMode = in.readBoolean();
236+
} else {
237+
resetMode = false;
238+
}
211239
}
212240

213241
/**
@@ -219,7 +247,7 @@ public MlMetadataDiff(StreamInput in) throws IOException {
219247
public Metadata.Custom apply(Metadata.Custom part) {
220248
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
221249
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
222-
return new MlMetadata(newJobs, newDatafeeds, upgradeMode);
250+
return new MlMetadata(newJobs, newDatafeeds, upgradeMode, resetMode);
223251
}
224252

225253
@Override
@@ -229,6 +257,9 @@ public void writeTo(StreamOutput out) throws IOException {
229257
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
230258
out.writeBoolean(upgradeMode);
231259
}
260+
if (out.getVersion().onOrAfter(Version.V_7_13_0)) {
261+
out.writeBoolean(resetMode);
262+
}
232263
}
233264

234265
@Override
@@ -254,7 +285,8 @@ public boolean equals(Object o) {
254285
MlMetadata that = (MlMetadata) o;
255286
return Objects.equals(jobs, that.jobs) &&
256287
Objects.equals(datafeeds, that.datafeeds) &&
257-
Objects.equals(upgradeMode, that.upgradeMode);
288+
upgradeMode == that.upgradeMode &&
289+
resetMode == that.resetMode;
258290
}
259291

260292
@Override
@@ -264,14 +296,19 @@ public final String toString() {
264296

265297
@Override
266298
public int hashCode() {
267-
return Objects.hash(jobs, datafeeds, upgradeMode);
299+
return Objects.hash(jobs, datafeeds, upgradeMode, resetMode);
268300
}
269301

270302
public static class Builder {
271303

272304
private TreeMap<String, Job> jobs;
273305
private TreeMap<String, DatafeedConfig> datafeeds;
274306
private boolean upgradeMode;
307+
private boolean resetMode;
308+
309+
public static Builder from(@Nullable MlMetadata previous) {
310+
return new Builder(previous);
311+
}
275312

276313
public Builder() {
277314
jobs = new TreeMap<>();
@@ -353,8 +390,13 @@ public Builder isUpgradeMode(boolean upgradeMode) {
353390
return this;
354391
}
355392

393+
public Builder isResetMode(boolean resetMode) {
394+
this.resetMode = resetMode;
395+
return this;
396+
}
397+
356398
public MlMetadata build() {
357-
return new MlMetadata(jobs, datafeeds, upgradeMode);
399+
return new MlMetadata(jobs, datafeeds, upgradeMode, resetMode);
358400
}
359401
}
360402

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -111,44 +111,50 @@ public String getJobId() {
111111
return jobId;
112112
}
113113

114-
public void setJobId(String jobId) {
114+
public Request setJobId(String jobId) {
115115
this.jobId = jobId;
116+
return this;
116117
}
117118

118119
public TimeValue getCloseTimeout() {
119120
return timeout;
120121
}
121122

122-
public void setCloseTimeout(TimeValue timeout) {
123+
public Request setCloseTimeout(TimeValue timeout) {
123124
this.timeout = timeout;
125+
return this;
124126
}
125127

126128
public boolean isForce() {
127129
return force;
128130
}
129131

130-
public void setForce(boolean force) {
132+
public Request setForce(boolean force) {
131133
this.force = force;
134+
return this;
132135
}
133136

134137
public boolean allowNoMatch() {
135138
return allowNoMatch;
136139
}
137140

138-
public void setAllowNoMatch(boolean allowNoMatch) {
141+
public Request setAllowNoMatch(boolean allowNoMatch) {
139142
this.allowNoMatch = allowNoMatch;
143+
return this;
140144
}
141145

142146
public boolean isLocal() { return local; }
143147

144-
public void setLocal(boolean local) {
148+
public Request setLocal(boolean local) {
145149
this.local = local;
150+
return this;
146151
}
147152

148153
public String[] getOpenJobIds() { return openJobIds; }
149154

150-
public void setOpenJobIds(String [] openJobIds) {
155+
public Request setOpenJobIds(String[] openJobIds) {
151156
this.openJobIds = openJobIds;
157+
return this;
152158
}
153159

154160
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.core.ml.action;
8+
9+
import org.elasticsearch.action.ActionRequestValidationException;
10+
import org.elasticsearch.action.ActionType;
11+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
12+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13+
import org.elasticsearch.common.ParseField;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
17+
import org.elasticsearch.common.xcontent.ToXContentObject;
18+
import org.elasticsearch.common.xcontent.XContentBuilder;
19+
20+
import java.io.IOException;
21+
import java.util.Objects;
22+
23+
public class SetResetModeAction extends ActionType<AcknowledgedResponse> {
24+
25+
public static final SetResetModeAction INSTANCE = new SetResetModeAction();
26+
public static final String NAME = "cluster:internal/xpack/ml/reset_mode";
27+
28+
private SetResetModeAction() {
29+
super(NAME, AcknowledgedResponse::readFrom);
30+
}
31+
32+
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
33+
34+
public static Request enabled() {
35+
return new Request(true);
36+
}
37+
38+
public static Request disabled() {
39+
return new Request(false);
40+
}
41+
42+
private final boolean enabled;
43+
44+
private static final ParseField ENABLED = new ParseField("enabled");
45+
public static final ConstructingObjectParser<Request, Void> PARSER =
46+
new ConstructingObjectParser<>(NAME, a -> new Request((Boolean)a[0]));
47+
48+
static {
49+
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
50+
}
51+
52+
Request(boolean enabled) {
53+
this.enabled = enabled;
54+
}
55+
56+
public Request(StreamInput in) throws IOException {
57+
super(in);
58+
this.enabled = in.readBoolean();
59+
}
60+
61+
public boolean isEnabled() {
62+
return enabled;
63+
}
64+
65+
@Override
66+
public ActionRequestValidationException validate() {
67+
return null;
68+
}
69+
70+
@Override
71+
public void writeTo(StreamOutput out) throws IOException {
72+
super.writeTo(out);
73+
out.writeBoolean(enabled);
74+
}
75+
76+
@Override
77+
public int hashCode() {
78+
return Objects.hash(enabled);
79+
}
80+
81+
@Override
82+
public boolean equals(Object obj) {
83+
if (this == obj) {
84+
return true;
85+
}
86+
if (obj == null || obj.getClass() != getClass()) {
87+
return false;
88+
}
89+
Request other = (Request) obj;
90+
return Objects.equals(enabled, other.enabled);
91+
}
92+
93+
@Override
94+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
95+
builder.startObject();
96+
builder.field(ENABLED.getPreferredName(), enabled);
97+
builder.endObject();
98+
return builder;
99+
}
100+
}
101+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ public Request() {
9595
setTimeout(DEFAULT_TIMEOUT);
9696
}
9797

98-
public final void setId(String id) {
98+
public final Request setId(String id) {
9999
this.id = ExceptionsHelper.requireNonNull(id, DataFrameAnalyticsConfig.ID);
100+
return this;
100101
}
101102

102103
public String getId() {
@@ -107,16 +108,18 @@ public boolean allowNoMatch() {
107108
return allowNoMatch;
108109
}
109110

110-
public void setAllowNoMatch(boolean allowNoMatch) {
111+
public Request setAllowNoMatch(boolean allowNoMatch) {
111112
this.allowNoMatch = allowNoMatch;
113+
return this;
112114
}
113115

114116
public boolean isForce() {
115117
return force;
116118
}
117119

118-
public void setForce(boolean force) {
120+
public Request setForce(boolean force) {
119121
this.force = force;
122+
return this;
120123
}
121124

122125
@Nullable

0 commit comments

Comments
 (0)