Skip to content

[ML] Set df-analytics task state to failed when appropriate #43880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,9 @@ static Request stopDataFrameAnalytics(StopDataFrameAnalyticsRequest stopRequest)
params.putParam(
StopDataFrameAnalyticsRequest.ALLOW_NO_MATCH.getPreferredName(), Boolean.toString(stopRequest.getAllowNoMatch()));
}
if (stopRequest.getForce() != null) {
params.putParam(StopDataFrameAnalyticsRequest.FORCE.getPreferredName(), Boolean.toString(stopRequest.getForce()));
}
request.addParameters(params.asMap());
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
public class StopDataFrameAnalyticsRequest implements Validatable {

public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
public static final ParseField FORCE = new ParseField("force");

private final String id;
private TimeValue timeout;
private Boolean allowNoMatch;
private Boolean force;
private TimeValue timeout;

public StopDataFrameAnalyticsRequest(String id) {
this.id = id;
Expand Down Expand Up @@ -62,6 +64,15 @@ public StopDataFrameAnalyticsRequest setAllowNoMatch(boolean allowNoMatch) {
return this;
}

public Boolean getForce() {
return force;
}

public StopDataFrameAnalyticsRequest setForce(boolean force) {
this.force = force;
return this;
}

@Override
public Optional<ValidationException> validate() {
if (id == null) {
Expand All @@ -78,11 +89,12 @@ public boolean equals(Object o) {
StopDataFrameAnalyticsRequest other = (StopDataFrameAnalyticsRequest) o;
return Objects.equals(id, other.id)
&& Objects.equals(timeout, other.timeout)
&& Objects.equals(allowNoMatch, other.allowNoMatch);
&& Objects.equals(allowNoMatch, other.allowNoMatch)
&& Objects.equals(force, other.force);
}

@Override
public int hashCode() {
return Objects.hash(id, timeout, allowNoMatch);
return Objects.hash(id, timeout, allowNoMatch, force);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws

static final ParseField ID = new ParseField("id");
static final ParseField STATE = new ParseField("state");
static final ParseField FAILURE_REASON = new ParseField("failure_reason");
static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent");
static final ParseField NODE = new ParseField("node");
static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation");
Expand All @@ -50,9 +51,10 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
args -> new DataFrameAnalyticsStats(
(String) args[0],
(DataFrameAnalyticsState) args[1],
(Integer) args[2],
(NodeAttributes) args[3],
(String) args[4]));
(String) args[2],
(Integer) args[3],
(NodeAttributes) args[4],
(String) args[5]));

static {
PARSER.declareString(constructorArg(), ID);
Expand All @@ -62,21 +64,25 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, STATE, ObjectParser.ValueType.STRING);
PARSER.declareString(optionalConstructorArg(), FAILURE_REASON);
PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT);
PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE);
PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION);
}

private final String id;
private final DataFrameAnalyticsState state;
private final String failureReason;
private final Integer progressPercent;
private final NodeAttributes node;
private final String assignmentExplanation;

public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercent,
@Nullable NodeAttributes node, @Nullable String assignmentExplanation) {
public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason,
@Nullable Integer progressPercent, @Nullable NodeAttributes node,
@Nullable String assignmentExplanation) {
this.id = id;
this.state = state;
this.failureReason = failureReason;
this.progressPercent = progressPercent;
this.node = node;
this.assignmentExplanation = assignmentExplanation;
Expand All @@ -90,6 +96,10 @@ public DataFrameAnalyticsState getState() {
return state;
}

public String getFailureReason() {
return failureReason;
}

public Integer getProgressPercent() {
return progressPercent;
}
Expand All @@ -110,21 +120,23 @@ public boolean equals(Object o) {
DataFrameAnalyticsStats other = (DataFrameAnalyticsStats) o;
return Objects.equals(id, other.id)
&& Objects.equals(state, other.state)
&& Objects.equals(failureReason, other.failureReason)
&& Objects.equals(progressPercent, other.progressPercent)
&& Objects.equals(node, other.node)
&& Objects.equals(assignmentExplanation, other.assignmentExplanation);
}

@Override
public int hashCode() {
return Objects.hash(id, state, progressPercent, node, assignmentExplanation);
return Objects.hash(id, state, failureReason, progressPercent, node, assignmentExplanation);
}

@Override
public String toString() {
return new ToStringBuilder(getClass())
.add("id", id)
.add("state", state)
.add("failureReason", failureReason)
.add("progressPercent", progressPercent)
.add("node", node)
.add("assignmentExplanation", assignmentExplanation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,11 +758,15 @@ public void testStopDataFrameAnalytics() {
public void testStopDataFrameAnalytics_WithParams() {
StopDataFrameAnalyticsRequest stopRequest = new StopDataFrameAnalyticsRequest(randomAlphaOfLength(10))
.setTimeout(TimeValue.timeValueMinutes(1))
.setAllowNoMatch(false);
.setAllowNoMatch(false)
.setForce(true);
Request request = MLRequestConverters.stopDataFrameAnalytics(stopRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals("/_ml/data_frame/analytics/" + stopRequest.getId() + "/_stop", request.getEndpoint());
assertThat(request.getParameters(), allOf(hasEntry("timeout", "1m"), hasEntry("allow_no_match", "false")));
assertThat(request.getParameters(), allOf(
hasEntry("timeout", "1m"),
hasEntry("allow_no_match", "false"),
hasEntry("force", "true")));
assertNull(request.getEntity());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,7 @@ public void testGetDataFrameAnalyticsStats() throws Exception {
DataFrameAnalyticsStats stats = statsResponse.getAnalyticsStats().get(0);
assertThat(stats.getId(), equalTo(configId));
assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED));
assertNull(stats.getFailureReason());
assertNull(stats.getProgressPercent());
assertNull(stats.getNode());
assertNull(stats.getAssignmentExplanation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3112,6 +3112,10 @@ public void testStopDataFrameAnalytics() throws Exception {
StopDataFrameAnalyticsRequest request = new StopDataFrameAnalyticsRequest("my-analytics-config"); // <1>
// end::stop-data-frame-analytics-request

//tag::stop-data-frame-analytics-request-force
request.setForce(false); // <2>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is in a different doc tag callout, I think the doc build will fail. You may need to move

request.setForce(false); // <2>

Up in side the // tag::stop-data-frame-analytics-request tag

//end::stop-data-frame-analytics-request-force

// tag::stop-data-frame-analytics-execute
StopDataFrameAnalyticsResponse response = client.machineLearning().stopDataFrameAnalytics(request, RequestOptions.DEFAULT);
// end::stop-data-frame-analytics-execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static DataFrameAnalyticsStats randomDataFrameAnalyticsStats() {
return new DataFrameAnalyticsStats(
randomAlphaOfLengthBetween(1, 10),
randomFrom(DataFrameAnalyticsState.values()),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomIntBetween(0, 100),
randomBoolean() ? null : NodeAttributesTests.createRandom(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20));
Expand All @@ -52,6 +53,9 @@ public static void toXContent(DataFrameAnalyticsStats stats, XContentBuilder bui
builder.startObject();
builder.field(DataFrameAnalyticsStats.ID.getPreferredName(), stats.getId());
builder.field(DataFrameAnalyticsStats.STATE.getPreferredName(), stats.getState().value());
if (stats.getFailureReason() != null) {
builder.field(DataFrameAnalyticsStats.FAILURE_REASON.getPreferredName(), stats.getFailureReason());
}
if (stats.getProgressPercent() != null) {
builder.field(DataFrameAnalyticsStats.PROGRESS_PERCENT.getPreferredName(), stats.getProgressPercent());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ A +{request}+ object requires a {dataframe-analytics-config} id.
include-tagged::{doc-tests-file}[{api}-request]
---------------------------------------------------
<1> Constructing a new stop request referencing an existing {dataframe-analytics-config}
<2> Optionally used to stop a failed task

include::../execution.asciidoc[]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
Expand Down Expand Up @@ -158,16 +158,19 @@ public static class Stats implements ToXContentObject, Writeable {
private final String id;
private final DataFrameAnalyticsState state;
@Nullable
private final String failureReason;
@Nullable
private final Integer progressPercentage;
@Nullable
private final DiscoveryNode node;
@Nullable
private final String assignmentExplanation;

public Stats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercentage,
public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, @Nullable Integer progressPercentage,
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation) {
this.id = Objects.requireNonNull(id);
this.state = Objects.requireNonNull(state);
this.failureReason = failureReason;
this.progressPercentage = progressPercentage;
this.node = node;
this.assignmentExplanation = assignmentExplanation;
Expand All @@ -176,6 +179,7 @@ public Stats(String id, DataFrameAnalyticsState state, @Nullable Integer progres
public Stats(StreamInput in) throws IOException {
id = in.readString();
state = DataFrameAnalyticsState.fromStream(in);
failureReason = in.readOptionalString();
progressPercentage = in.readOptionalInt();
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
Expand All @@ -202,6 +206,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOException {
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
builder.field("state", state.toString());
if (failureReason != null) {
builder.field("failure_reason", failureReason);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be prudent to use a more generic reason as the field name. If the state: failed, then we know it failed. If we ever decide to populate reason with other information to indicate upgrading, stopping, etc. it would be good to be able to use the same field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in the short term while we only populate the field when the job is failed we might get questions about why reason is empty most of the time.

For the question of why it's not assigned to a node during an upgrade this will already be available in assignment_explanation.

I guess the question is whether to combine assignment_explanation and failure_reason into a single reason field. I'm happy to keep them separate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was to avoid having a state object. And as the stats object already has assignment_explanation, reason seems to be confusing. If we need to add more things in the future I think we'll have to rethink the object structure a bit.

}
if (progressPercentage != null) {
builder.field("progress_percent", progressPercentage);
}
Expand Down Expand Up @@ -229,14 +236,15 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
state.writeTo(out);
out.writeOptionalString(failureReason);
out.writeOptionalInt(progressPercentage);
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
}

@Override
public int hashCode() {
return Objects.hash(id, state, progressPercentage, node, assignmentExplanation);
return Objects.hash(id, state, failureReason, progressPercentage, node, assignmentExplanation);
}

@Override
Expand All @@ -250,6 +258,7 @@ public boolean equals(Object obj) {
Stats other = (Stats) obj;
return Objects.equals(id, other.id)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.failureReason, other.failureReason)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient;
Expand Down Expand Up @@ -157,20 +157,32 @@ public static class TaskParams implements XPackPlugin.XPackPersistentTaskParams
public static final Version VERSION_INTRODUCED = Version.V_7_3_0;

public static ConstructingObjectParser<TaskParams, Void> PARSER = new ConstructingObjectParser<>(
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0]));
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0], (String) a[1]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.VERSION);
}

public static TaskParams fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private String id;
private final String id;
private final Version version;

public TaskParams(String id) {
public TaskParams(String id, Version version) {
this.id = Objects.requireNonNull(id);
this.version = Objects.requireNonNull(version);
}

private TaskParams(String id, String version) {
this(id, Version.fromString(version));
}

public TaskParams(StreamInput in) throws IOException {
this.id = in.readString();
this.version = Version.readVersion(in);
}

public String getId() {
Expand All @@ -190,15 +202,31 @@ public Version getMinimalSupportedVersion() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
Version.writeVersion(version, out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
builder.field(DataFrameAnalyticsConfig.VERSION.getPreferredName(), version);
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(id, version);
}

@Override
public boolean equals(Object o) {
if (o == this) return true;
if (o == null || getClass() != o.getClass()) return false;

TaskParams other = (TaskParams) o;
return Objects.equals(id, other.id) && Objects.equals(version, other.version);
}
}

public interface TaskMatcher {
Expand Down
Loading