Skip to content

Commit d6f36a8

Browse files
[ML] Set df-analytics task state to failed when appropriate (#43880)
This introduces a `failed` state to which the data frame analytics persistent task is set to when something unexpected fails. It could be the process crashing, the results processor hitting some error, etc. The failure message is then captured and set on the task state. From there, it becomes available via the _stats API as `failure_reason`. The df-analytics stop API now has a `force` boolean parameter. This allows the user to call it for a failed task in order to reset it to `stopped` after we have ensured the failure has been communicated to the user. This commit also adds the analytics version in the persistent task params as this allows us to prevent tasks to run on unsuitable nodes in the future.
1 parent 00a5e5a commit d6f36a8

File tree

34 files changed

+565
-117
lines changed

34 files changed

+565
-117
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

+3
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,9 @@ static Request stopDataFrameAnalytics(StopDataFrameAnalyticsRequest stopRequest)
676676
params.putParam(
677677
StopDataFrameAnalyticsRequest.ALLOW_NO_MATCH.getPreferredName(), Boolean.toString(stopRequest.getAllowNoMatch()));
678678
}
679+
if (stopRequest.getForce() != null) {
680+
params.putParam(StopDataFrameAnalyticsRequest.FORCE.getPreferredName(), Boolean.toString(stopRequest.getForce()));
681+
}
679682
request.addParameters(params.asMap());
680683
return request;
681684
}

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@
3131
public class StopDataFrameAnalyticsRequest implements Validatable {
3232

3333
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
34+
public static final ParseField FORCE = new ParseField("force");
3435

3536
private final String id;
36-
private TimeValue timeout;
3737
private Boolean allowNoMatch;
38+
private Boolean force;
39+
private TimeValue timeout;
3840

3941
public StopDataFrameAnalyticsRequest(String id) {
4042
this.id = id;
@@ -62,6 +64,15 @@ public StopDataFrameAnalyticsRequest setAllowNoMatch(boolean allowNoMatch) {
6264
return this;
6365
}
6466

67+
public Boolean getForce() {
68+
return force;
69+
}
70+
71+
public StopDataFrameAnalyticsRequest setForce(boolean force) {
72+
this.force = force;
73+
return this;
74+
}
75+
6576
@Override
6677
public Optional<ValidationException> validate() {
6778
if (id == null) {
@@ -78,11 +89,12 @@ public boolean equals(Object o) {
7889
StopDataFrameAnalyticsRequest other = (StopDataFrameAnalyticsRequest) o;
7990
return Objects.equals(id, other.id)
8091
&& Objects.equals(timeout, other.timeout)
81-
&& Objects.equals(allowNoMatch, other.allowNoMatch);
92+
&& Objects.equals(allowNoMatch, other.allowNoMatch)
93+
&& Objects.equals(force, other.force);
8294
}
8395

8496
@Override
8597
public int hashCode() {
86-
return Objects.hash(id, timeout, allowNoMatch);
98+
return Objects.hash(id, timeout, allowNoMatch, force);
8799
}
88100
}

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
4141

4242
static final ParseField ID = new ParseField("id");
4343
static final ParseField STATE = new ParseField("state");
44+
static final ParseField FAILURE_REASON = new ParseField("failure_reason");
4445
static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent");
4546
static final ParseField NODE = new ParseField("node");
4647
static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation");
@@ -50,9 +51,10 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
5051
args -> new DataFrameAnalyticsStats(
5152
(String) args[0],
5253
(DataFrameAnalyticsState) args[1],
53-
(Integer) args[2],
54-
(NodeAttributes) args[3],
55-
(String) args[4]));
54+
(String) args[2],
55+
(Integer) args[3],
56+
(NodeAttributes) args[4],
57+
(String) args[5]));
5658

5759
static {
5860
PARSER.declareString(constructorArg(), ID);
@@ -62,21 +64,25 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
6264
}
6365
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
6466
}, STATE, ObjectParser.ValueType.STRING);
67+
PARSER.declareString(optionalConstructorArg(), FAILURE_REASON);
6568
PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT);
6669
PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE);
6770
PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION);
6871
}
6972

7073
private final String id;
7174
private final DataFrameAnalyticsState state;
75+
private final String failureReason;
7276
private final Integer progressPercent;
7377
private final NodeAttributes node;
7478
private final String assignmentExplanation;
7579

76-
public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercent,
77-
@Nullable NodeAttributes node, @Nullable String assignmentExplanation) {
80+
public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason,
81+
@Nullable Integer progressPercent, @Nullable NodeAttributes node,
82+
@Nullable String assignmentExplanation) {
7883
this.id = id;
7984
this.state = state;
85+
this.failureReason = failureReason;
8086
this.progressPercent = progressPercent;
8187
this.node = node;
8288
this.assignmentExplanation = assignmentExplanation;
@@ -90,6 +96,10 @@ public DataFrameAnalyticsState getState() {
9096
return state;
9197
}
9298

99+
public String getFailureReason() {
100+
return failureReason;
101+
}
102+
93103
public Integer getProgressPercent() {
94104
return progressPercent;
95105
}
@@ -110,21 +120,23 @@ public boolean equals(Object o) {
110120
DataFrameAnalyticsStats other = (DataFrameAnalyticsStats) o;
111121
return Objects.equals(id, other.id)
112122
&& Objects.equals(state, other.state)
123+
&& Objects.equals(failureReason, other.failureReason)
113124
&& Objects.equals(progressPercent, other.progressPercent)
114125
&& Objects.equals(node, other.node)
115126
&& Objects.equals(assignmentExplanation, other.assignmentExplanation);
116127
}
117128

118129
@Override
119130
public int hashCode() {
120-
return Objects.hash(id, state, progressPercent, node, assignmentExplanation);
131+
return Objects.hash(id, state, failureReason, progressPercent, node, assignmentExplanation);
121132
}
122133

123134
@Override
124135
public String toString() {
125136
return new ToStringBuilder(getClass())
126137
.add("id", id)
127138
.add("state", state)
139+
.add("failureReason", failureReason)
128140
.add("progressPercent", progressPercent)
129141
.add("node", node)
130142
.add("assignmentExplanation", assignmentExplanation)

client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -758,11 +758,15 @@ public void testStopDataFrameAnalytics() {
758758
public void testStopDataFrameAnalytics_WithParams() {
759759
StopDataFrameAnalyticsRequest stopRequest = new StopDataFrameAnalyticsRequest(randomAlphaOfLength(10))
760760
.setTimeout(TimeValue.timeValueMinutes(1))
761-
.setAllowNoMatch(false);
761+
.setAllowNoMatch(false)
762+
.setForce(true);
762763
Request request = MLRequestConverters.stopDataFrameAnalytics(stopRequest);
763764
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
764765
assertEquals("/_ml/data_frame/analytics/" + stopRequest.getId() + "/_stop", request.getEndpoint());
765-
assertThat(request.getParameters(), allOf(hasEntry("timeout", "1m"), hasEntry("allow_no_match", "false")));
766+
assertThat(request.getParameters(), allOf(
767+
hasEntry("timeout", "1m"),
768+
hasEntry("allow_no_match", "false"),
769+
hasEntry("force", "true")));
766770
assertNull(request.getEntity());
767771
}
768772

client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -1359,6 +1359,7 @@ public void testGetDataFrameAnalyticsStats() throws Exception {
13591359
DataFrameAnalyticsStats stats = statsResponse.getAnalyticsStats().get(0);
13601360
assertThat(stats.getId(), equalTo(configId));
13611361
assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED));
1362+
assertNull(stats.getFailureReason());
13621363
assertNull(stats.getProgressPercent());
13631364
assertNull(stats.getNode());
13641365
assertNull(stats.getAssignmentExplanation());

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -3110,6 +3110,7 @@ public void testStopDataFrameAnalytics() throws Exception {
31103110
{
31113111
// tag::stop-data-frame-analytics-request
31123112
StopDataFrameAnalyticsRequest request = new StopDataFrameAnalyticsRequest("my-analytics-config"); // <1>
3113+
request.setForce(false); // <2>
31133114
// end::stop-data-frame-analytics-request
31143115

31153116
// tag::stop-data-frame-analytics-execute

client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public static DataFrameAnalyticsStats randomDataFrameAnalyticsStats() {
4343
return new DataFrameAnalyticsStats(
4444
randomAlphaOfLengthBetween(1, 10),
4545
randomFrom(DataFrameAnalyticsState.values()),
46+
randomBoolean() ? null : randomAlphaOfLength(10),
4647
randomBoolean() ? null : randomIntBetween(0, 100),
4748
randomBoolean() ? null : NodeAttributesTests.createRandom(),
4849
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20));
@@ -52,6 +53,9 @@ public static void toXContent(DataFrameAnalyticsStats stats, XContentBuilder bui
5253
builder.startObject();
5354
builder.field(DataFrameAnalyticsStats.ID.getPreferredName(), stats.getId());
5455
builder.field(DataFrameAnalyticsStats.STATE.getPreferredName(), stats.getState().value());
56+
if (stats.getFailureReason() != null) {
57+
builder.field(DataFrameAnalyticsStats.FAILURE_REASON.getPreferredName(), stats.getFailureReason());
58+
}
5559
if (stats.getProgressPercent() != null) {
5660
builder.field(DataFrameAnalyticsStats.PROGRESS_PERCENT.getPreferredName(), stats.getProgressPercent());
5761
}

docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ A +{request}+ object requires a {dataframe-analytics-config} id.
1919
include-tagged::{doc-tests-file}[{api}-request]
2020
---------------------------------------------------
2121
<1> Constructing a new stop request referencing an existing {dataframe-analytics-config}
22+
<2> Optionally used to stop a failed task
2223

2324
include::../execution.asciidoc[]
2425

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
package org.elasticsearch.xpack.core.ml.action;
77

88
import org.elasticsearch.ElasticsearchException;
9-
import org.elasticsearch.action.ActionType;
109
import org.elasticsearch.action.ActionRequestBuilder;
1110
import org.elasticsearch.action.ActionRequestValidationException;
11+
import org.elasticsearch.action.ActionType;
1212
import org.elasticsearch.action.TaskOperationFailure;
1313
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
1414
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
@@ -158,16 +158,19 @@ public static class Stats implements ToXContentObject, Writeable {
158158
private final String id;
159159
private final DataFrameAnalyticsState state;
160160
@Nullable
161+
private final String failureReason;
162+
@Nullable
161163
private final Integer progressPercentage;
162164
@Nullable
163165
private final DiscoveryNode node;
164166
@Nullable
165167
private final String assignmentExplanation;
166168

167-
public Stats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercentage,
169+
public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, @Nullable Integer progressPercentage,
168170
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation) {
169171
this.id = Objects.requireNonNull(id);
170172
this.state = Objects.requireNonNull(state);
173+
this.failureReason = failureReason;
171174
this.progressPercentage = progressPercentage;
172175
this.node = node;
173176
this.assignmentExplanation = assignmentExplanation;
@@ -176,6 +179,7 @@ public Stats(String id, DataFrameAnalyticsState state, @Nullable Integer progres
176179
public Stats(StreamInput in) throws IOException {
177180
id = in.readString();
178181
state = DataFrameAnalyticsState.fromStream(in);
182+
failureReason = in.readOptionalString();
179183
progressPercentage = in.readOptionalInt();
180184
node = in.readOptionalWriteable(DiscoveryNode::new);
181185
assignmentExplanation = in.readOptionalString();
@@ -202,6 +206,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
202206
public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOException {
203207
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
204208
builder.field("state", state.toString());
209+
if (failureReason != null) {
210+
builder.field("failure_reason", failureReason);
211+
}
205212
if (progressPercentage != null) {
206213
builder.field("progress_percent", progressPercentage);
207214
}
@@ -229,14 +236,15 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc
229236
public void writeTo(StreamOutput out) throws IOException {
230237
out.writeString(id);
231238
state.writeTo(out);
239+
out.writeOptionalString(failureReason);
232240
out.writeOptionalInt(progressPercentage);
233241
out.writeOptionalWriteable(node);
234242
out.writeOptionalString(assignmentExplanation);
235243
}
236244

237245
@Override
238246
public int hashCode() {
239-
return Objects.hash(id, state, progressPercentage, node, assignmentExplanation);
247+
return Objects.hash(id, state, failureReason, progressPercentage, node, assignmentExplanation);
240248
}
241249

242250
@Override
@@ -250,6 +258,7 @@ public boolean equals(Object obj) {
250258
Stats other = (Stats) obj;
251259
return Objects.equals(id, other.id)
252260
&& Objects.equals(this.state, other.state)
261+
&& Objects.equals(this.failureReason, other.failureReason)
253262
&& Objects.equals(this.node, other.node)
254263
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
255264
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java

+32-4
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
package org.elasticsearch.xpack.core.ml.action;
77

88
import org.elasticsearch.Version;
9-
import org.elasticsearch.action.ActionType;
109
import org.elasticsearch.action.ActionRequestBuilder;
1110
import org.elasticsearch.action.ActionRequestValidationException;
11+
import org.elasticsearch.action.ActionType;
1212
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1313
import org.elasticsearch.action.support.master.MasterNodeRequest;
1414
import org.elasticsearch.client.ElasticsearchClient;
@@ -157,20 +157,32 @@ public static class TaskParams implements XPackPlugin.XPackPersistentTaskParams
157157
public static final Version VERSION_INTRODUCED = Version.V_7_3_0;
158158

159159
public static ConstructingObjectParser<TaskParams, Void> PARSER = new ConstructingObjectParser<>(
160-
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0]));
160+
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0], (String) a[1]));
161+
162+
static {
163+
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.ID);
164+
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.VERSION);
165+
}
161166

162167
public static TaskParams fromXContent(XContentParser parser) {
163168
return PARSER.apply(parser, null);
164169
}
165170

166-
private String id;
171+
private final String id;
172+
private final Version version;
167173

168-
public TaskParams(String id) {
174+
public TaskParams(String id, Version version) {
169175
this.id = Objects.requireNonNull(id);
176+
this.version = Objects.requireNonNull(version);
177+
}
178+
179+
private TaskParams(String id, String version) {
180+
this(id, Version.fromString(version));
170181
}
171182

172183
public TaskParams(StreamInput in) throws IOException {
173184
this.id = in.readString();
185+
this.version = Version.readVersion(in);
174186
}
175187

176188
public String getId() {
@@ -190,15 +202,31 @@ public Version getMinimalSupportedVersion() {
190202
@Override
191203
public void writeTo(StreamOutput out) throws IOException {
192204
out.writeString(id);
205+
Version.writeVersion(version, out);
193206
}
194207

195208
@Override
196209
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
197210
builder.startObject();
198211
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
212+
builder.field(DataFrameAnalyticsConfig.VERSION.getPreferredName(), version);
199213
builder.endObject();
200214
return builder;
201215
}
216+
217+
@Override
218+
public int hashCode() {
219+
return Objects.hash(id, version);
220+
}
221+
222+
@Override
223+
public boolean equals(Object o) {
224+
if (o == this) return true;
225+
if (o == null || getClass() != o.getClass()) return false;
226+
227+
TaskParams other = (TaskParams) o;
228+
return Objects.equals(id, other.id) && Objects.equals(version, other.version);
229+
}
202230
}
203231

204232
public interface TaskMatcher {

0 commit comments

Comments
 (0)