From 164b36559402bd32385fe37d8498deffe0cc74ba Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 1 Jul 2019 19:00:04 +0300 Subject: [PATCH 1/3] [ML] Set df-analytics task state to failed when appropriate This introduces a `failed` state to which the data frame analytics persistent task is set to when something unexpected fails. It could be the process crashing, the results processor hitting some error, etc. The failure message is then captured and set on the task state. From there, it becomes available via the _stats API as `failure_reason`. The df-analytics stop API now has a `force` boolean parameter. This allows the user to call it for a failed task in order to reset it to `stopped` after we have ensured the failure has been communicated to the user. This commit also adds the analytics version in the persistent task params as this allows us to prevent tasks to run on unsuitable nodes in the future. --- .../client/MLRequestConverters.java | 3 + .../ml/StopDataFrameAnalyticsRequest.java | 18 ++- .../client/MLRequestConvertersTests.java | 8 +- .../MlClientDocumentationIT.java | 4 + .../ml/stop-data-frame-analytics.asciidoc | 1 + .../GetDataFrameAnalyticsStatsAction.java | 15 ++- .../action/StartDataFrameAnalyticsAction.java | 36 +++++- .../action/StopDataFrameAnalyticsAction.java | 48 +++++--- .../ml/dataframe/DataFrameAnalyticsState.java | 10 +- .../DataFrameAnalyticsTaskState.java | 24 +++- ...rameAnalyticsStatsActionResponseTests.java | 3 +- ...taFrameAnalyticsActionTaskParamsTests.java | 37 ++++++ .../StopDataFrameAnalyticsRequestTests.java | 3 + .../DataFrameAnalyticsStateTests.java | 41 +++++++ .../DataFrameAnalyticsTaskStateTests.java | 35 ++++++ .../xpack/ml/MachineLearning.java | 2 +- ...sportGetDataFrameAnalyticsStatsAction.java | 8 +- ...ransportStartDataFrameAnalyticsAction.java | 42 +++++-- ...TransportStopDataFrameAnalyticsAction.java | 39 +++++-- .../dataframe/DataFrameAnalyticsManager.java | 4 +- .../process/AnalyticsProcessFactory.java | 5 +- .../process/AnalyticsProcessManager.java | 110 ++++++++++++++---- .../process/AnalyticsResultProcessor.java | 8 ++ .../process/DataFrameRowsJoiner.java | 28 ++--- .../NativeAnalyticsProcessFactory.java | 6 +- .../xpack/ml/job/JobNodeSelector.java | 5 +- .../RestStopDataFrameAnalyticsAction.java | 15 +-- ...portStopDataFrameAnalyticsActionTests.java | 85 ++++++++++++++ .../xpack/ml/job/JobNodeSelectorTests.java | 5 +- .../ml/process/MlMemoryTrackerTests.java | 3 +- .../api/ml.stop_data_frame_analytics.json | 5 + 31 files changed, 545 insertions(+), 111 deletions(-) create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskStateTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index e5a98b4632432..651851e345df9 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -676,6 +676,9 @@ static Request stopDataFrameAnalytics(StopDataFrameAnalyticsRequest stopRequest) params.putParam( StopDataFrameAnalyticsRequest.ALLOW_NO_MATCH.getPreferredName(), Boolean.toString(stopRequest.getAllowNoMatch())); } + if (stopRequest.getForce() != null) { + params.putParam(StopDataFrameAnalyticsRequest.FORCE.getPreferredName(), Boolean.toString(stopRequest.getForce())); + } request.addParameters(params.asMap()); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java index 9608d40fc7d16..4ba6af852f61c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java @@ -31,10 +31,12 @@ public class StopDataFrameAnalyticsRequest implements Validatable { public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); + public static final ParseField FORCE = new ParseField("force"); private final String id; - private TimeValue timeout; private Boolean allowNoMatch; + private Boolean force; + private TimeValue timeout; public StopDataFrameAnalyticsRequest(String id) { this.id = id; @@ -62,6 +64,15 @@ public StopDataFrameAnalyticsRequest setAllowNoMatch(boolean allowNoMatch) { return this; } + public Boolean getForce() { + return force; + } + + public StopDataFrameAnalyticsRequest setForce(boolean force) { + this.force = force; + return this; + } + @Override public Optional validate() { if (id == null) { @@ -78,11 +89,12 @@ public boolean equals(Object o) { StopDataFrameAnalyticsRequest other = (StopDataFrameAnalyticsRequest) o; return Objects.equals(id, other.id) && Objects.equals(timeout, other.timeout) - && Objects.equals(allowNoMatch, other.allowNoMatch); + && Objects.equals(allowNoMatch, other.allowNoMatch) + && Objects.equals(force, other.force); } @Override public int hashCode() { - return Objects.hash(id, timeout, allowNoMatch); + return Objects.hash(id, timeout, allowNoMatch, force); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index 9bb2bb42fd9d7..aff25b4aa2d3b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -758,11 +758,15 @@ public void testStopDataFrameAnalytics() { public void testStopDataFrameAnalytics_WithParams() { StopDataFrameAnalyticsRequest stopRequest = new StopDataFrameAnalyticsRequest(randomAlphaOfLength(10)) .setTimeout(TimeValue.timeValueMinutes(1)) - .setAllowNoMatch(false); + .setAllowNoMatch(false) + .setForce(true); Request request = MLRequestConverters.stopDataFrameAnalytics(stopRequest); assertEquals(HttpPost.METHOD_NAME, request.getMethod()); assertEquals("/_ml/data_frame/analytics/" + stopRequest.getId() + "/_stop", request.getEndpoint()); - assertThat(request.getParameters(), allOf(hasEntry("timeout", "1m"), hasEntry("allow_no_match", "false"))); + assertThat(request.getParameters(), allOf( + hasEntry("timeout", "1m"), + hasEntry("allow_no_match", "false"), + hasEntry("force", "true"))); assertNull(request.getEntity()); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index e7cbaa743b544..7c5f0b8ccf6bf 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -3112,6 +3112,10 @@ public void testStopDataFrameAnalytics() throws Exception { StopDataFrameAnalyticsRequest request = new StopDataFrameAnalyticsRequest("my-analytics-config"); // <1> // end::stop-data-frame-analytics-request + //tag::stop-data-frame-analytics-request-force + request.setForce(false); // <2> + //end::stop-data-frame-analytics-request-force + // tag::stop-data-frame-analytics-execute StopDataFrameAnalyticsResponse response = client.machineLearning().stopDataFrameAnalytics(request, RequestOptions.DEFAULT); // end::stop-data-frame-analytics-execute diff --git a/docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc index 243c075e18b03..3a06f268836f6 100644 --- a/docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc @@ -19,6 +19,7 @@ A +{request}+ object requires a {dataframe-analytics-config} id. include-tagged::{doc-tests-file}[{api}-request] --------------------------------------------------- <1> Constructing a new stop request referencing an existing {dataframe-analytics-config} +<2> Optionally used to stop a failed task include::../execution.asciidoc[] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java index e92abb2619f5f..a0e70463c52e1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java @@ -6,9 +6,9 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; @@ -158,16 +158,19 @@ public static class Stats implements ToXContentObject, Writeable { private final String id; private final DataFrameAnalyticsState state; @Nullable + private final String failureReason; + @Nullable private final Integer progressPercentage; @Nullable private final DiscoveryNode node; @Nullable private final String assignmentExplanation; - public Stats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercentage, + public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, @Nullable Integer progressPercentage, @Nullable DiscoveryNode node, @Nullable String assignmentExplanation) { this.id = Objects.requireNonNull(id); this.state = Objects.requireNonNull(state); + this.failureReason = failureReason; this.progressPercentage = progressPercentage; this.node = node; this.assignmentExplanation = assignmentExplanation; @@ -176,6 +179,7 @@ public Stats(String id, DataFrameAnalyticsState state, @Nullable Integer progres public Stats(StreamInput in) throws IOException { id = in.readString(); state = DataFrameAnalyticsState.fromStream(in); + failureReason = in.readOptionalString(); progressPercentage = in.readOptionalInt(); node = in.readOptionalWriteable(DiscoveryNode::new); assignmentExplanation = in.readOptionalString(); @@ -202,6 +206,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOException { builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id); builder.field("state", state.toString()); + if (failureReason != null) { + builder.field("failure_reason", failureReason); + } if (progressPercentage != null) { builder.field("progress_percent", progressPercentage); } @@ -229,6 +236,7 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc public void writeTo(StreamOutput out) throws IOException { out.writeString(id); state.writeTo(out); + out.writeOptionalString(failureReason); out.writeOptionalInt(progressPercentage); out.writeOptionalWriteable(node); out.writeOptionalString(assignmentExplanation); @@ -236,7 +244,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public int hashCode() { - return Objects.hash(id, state, progressPercentage, node, assignmentExplanation); + return Objects.hash(id, state, failureReason, progressPercentage, node, assignmentExplanation); } @Override @@ -250,6 +258,7 @@ public boolean equals(Object obj) { Stats other = (Stats) obj; return Objects.equals(id, other.id) && Objects.equals(this.state, other.state) + && Objects.equals(this.failureReason, other.failureReason) && Objects.equals(this.node, other.node) && Objects.equals(this.assignmentExplanation, other.assignmentExplanation); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java index 8aa9b52ce5338..b0a45c3742188 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java @@ -6,9 +6,9 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.ElasticsearchClient; @@ -157,20 +157,32 @@ public static class TaskParams implements XPackPlugin.XPackPersistentTaskParams public static final Version VERSION_INTRODUCED = Version.V_7_3_0; public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0])); + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0], (String) a[1])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.VERSION); + } public static TaskParams fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } - private String id; + private final String id; + private final Version version; - public TaskParams(String id) { + public TaskParams(String id, Version version) { this.id = Objects.requireNonNull(id); + this.version = Objects.requireNonNull(version); + } + + private TaskParams(String id, String version) { + this(id, Version.fromString(version)); } public TaskParams(StreamInput in) throws IOException { this.id = in.readString(); + this.version = Version.readVersion(in); } public String getId() { @@ -190,15 +202,31 @@ public Version getMinimalSupportedVersion() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); + Version.writeVersion(version, out); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id); + builder.field(DataFrameAnalyticsConfig.VERSION.getPreferredName(), version); builder.endObject(); return builder; } + + @Override + public int hashCode() { + return Objects.hash(id, version); + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if (o == null || getClass() != o.getClass()) return false; + + TaskParams other = (TaskParams) o; + return Objects.equals(id, other.id) && Objects.equals(version, other.version); + } } public interface TaskMatcher { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java index 74f7c5e07b559..3e671a10e5f0e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsAction.java @@ -5,9 +5,9 @@ */ package org.elasticsearch.xpack.core.ml.action; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; @@ -49,14 +49,17 @@ public Writeable.Reader getResponseReader() { public static class Request extends BaseTasksRequest implements ToXContentObject { - public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); + public static final ParseField FORCE = new ParseField("force"); + public static final ParseField TIMEOUT = new ParseField("timeout"); private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); static { PARSER.declareString((request, id) -> request.id = id, DataFrameAnalyticsConfig.ID); PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH); + PARSER.declareBoolean(Request::setForce, FORCE); } public static Request parseRequest(String id, XContentParser parser) { @@ -71,8 +74,9 @@ public static Request parseRequest(String id, XContentParser parser) { } private String id; - private Set expandedIds = Collections.emptySet(); private boolean allowNoMatch = true; + private boolean force; + private Set expandedIds = Collections.emptySet(); public Request(String id) { setId(id); @@ -81,8 +85,9 @@ public Request(String id) { public Request(StreamInput in) throws IOException { super(in); id = in.readString(); - expandedIds = new HashSet<>(Arrays.asList(in.readStringArray())); allowNoMatch = in.readBoolean(); + force = in.readBoolean(); + expandedIds = new HashSet<>(Arrays.asList(in.readStringArray())); } public Request() {} @@ -95,6 +100,22 @@ public String getId() { return id; } + public boolean allowNoMatch() { + return allowNoMatch; + } + + public void setAllowNoMatch(boolean allowNoMatch) { + this.allowNoMatch = allowNoMatch; + } + + public boolean isForce() { + return force; + } + + public void setForce(boolean force) { + this.force = force; + } + @Nullable public Set getExpandedIds() { return expandedIds; @@ -104,14 +125,6 @@ public void setExpandedIds(Set expandedIds) { this.expandedIds = Objects.requireNonNull(expandedIds); } - public boolean allowNoMatch() { - return allowNoMatch; - } - - public void setAllowNoMatch(boolean allowNoMatch) { - this.allowNoMatch = allowNoMatch; - } - @Override public ActionRequestValidationException validate() { return null; @@ -121,8 +134,9 @@ public ActionRequestValidationException validate() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); - out.writeStringArray(expandedIds.toArray(new String[0])); out.writeBoolean(allowNoMatch); + out.writeBoolean(force); + out.writeStringArray(expandedIds.toArray(new String[0])); } @Override @@ -131,12 +145,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .startObject() .field(DataFrameAnalyticsConfig.ID.getPreferredName(), id) .field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch) + .field(FORCE.getPreferredName(), force) .endObject(); } @Override public int hashCode() { - return Objects.hash(id, getTimeout(), expandedIds, allowNoMatch); + return Objects.hash(id, getTimeout(), allowNoMatch, force, expandedIds); } @Override @@ -150,8 +165,9 @@ public boolean equals(Object obj) { StopDataFrameAnalyticsAction.Request other = (StopDataFrameAnalyticsAction.Request) obj; return Objects.equals(id, other.id) && Objects.equals(getTimeout(), other.getTimeout()) - && Objects.equals(expandedIds, other.expandedIds) - && allowNoMatch == other.allowNoMatch; + && allowNoMatch == other.allowNoMatch + && force == other.force + && Objects.equals(expandedIds, other.expandedIds); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsState.java index d40df259eec57..8f0a481ed9030 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsState.java @@ -10,11 +10,12 @@ import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; +import java.util.Arrays; import java.util.Locale; public enum DataFrameAnalyticsState implements Writeable { - STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED; + STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, FAILED; public static DataFrameAnalyticsState fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); @@ -33,4 +34,11 @@ public void writeTo(StreamOutput out) throws IOException { public String toString() { return name().toLowerCase(Locale.ROOT); } + + /** + * @return {@code true} if state matches any of the given {@code candidates} + */ + public boolean isAnyOf(DataFrameAnalyticsState... candidates) { + return Arrays.stream(candidates).anyMatch(candidate -> this == candidate); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskState.java index 994faaaee6cc2..188f3a2bdf2f6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskState.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,13 +26,15 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState { private static ParseField STATE = new ParseField("state"); private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); + private static ParseField REASON = new ParseField("reason"); private final DataFrameAnalyticsState state; private final long allocationId; + private final String reason; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, - a -> new DataFrameAnalyticsTaskState((DataFrameAnalyticsState) a[0], (long) a[1])); + a -> new DataFrameAnalyticsTaskState((DataFrameAnalyticsState) a[0], (long) a[1], (String) a[2])); static { PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { @@ -41,6 +44,7 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState { throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, STATE, ObjectParser.ValueType.STRING); PARSER.declareLong(ConstructingObjectParser.constructorArg(), ALLOCATION_ID); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON); } public static DataFrameAnalyticsTaskState fromXContent(XContentParser parser) { @@ -51,20 +55,27 @@ public static DataFrameAnalyticsTaskState fromXContent(XContentParser parser) { } } - public DataFrameAnalyticsTaskState(DataFrameAnalyticsState state, long allocationId) { + public DataFrameAnalyticsTaskState(DataFrameAnalyticsState state, long allocationId, @Nullable String reason) { this.state = Objects.requireNonNull(state); this.allocationId = allocationId; + this.reason = reason; } public DataFrameAnalyticsTaskState(StreamInput in) throws IOException { this.state = DataFrameAnalyticsState.fromStream(in); this.allocationId = in.readLong(); + this.reason = in.readOptionalString(); } public DataFrameAnalyticsState getState() { return state; } + @Nullable + public String getReason() { + return reason; + } + public boolean isStatusStale(PersistentTasksCustomMetaData.PersistentTask task) { return allocationId != task.getAllocationId(); } @@ -78,6 +89,7 @@ public String getWriteableName() { public void writeTo(StreamOutput out) throws IOException { state.writeTo(out); out.writeLong(allocationId); + out.writeOptionalString(reason); } @Override @@ -85,6 +97,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(STATE.getPreferredName(), state.toString()); builder.field(ALLOCATION_ID.getPreferredName(), allocationId); + if (reason != null) { + builder.field(REASON.getPreferredName(), reason); + } builder.endObject(); return builder; } @@ -95,11 +110,12 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; DataFrameAnalyticsTaskState that = (DataFrameAnalyticsTaskState) o; return allocationId == that.allocationId && - state == that.state; + state == that.state && + Objects.equals(reason, that.reason); } @Override public int hashCode() { - return Objects.hash(state, allocationId); + return Objects.hash(state, allocationId, reason); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java index e01618520f5a8..5a88f2ea52eab 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java @@ -23,8 +23,9 @@ protected Response createTestInstance() { List analytics = new ArrayList<>(listSize); for (int j = 0; j < listSize; j++) { Integer progressPercentage = randomBoolean() ? null : randomIntBetween(0, 100); + String failureReason = randomBoolean() ? null : randomAlphaOfLength(10); Response.Stats stats = new Response.Stats(DataFrameAnalyticsConfigTests.randomValidId(), - randomFrom(DataFrameAnalyticsState.values()), progressPercentage, null, randomAlphaOfLength(20)); + randomFrom(DataFrameAnalyticsState.values()), failureReason, progressPercentage, null, randomAlphaOfLength(20)); analytics.add(stats); } return new Response(new QueryPage<>(analytics, analytics.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java new file mode 100644 index 0000000000000..4af9a2a374c5d --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java @@ -0,0 +1,37 @@ + +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class StartDataFrameAnalyticsActionTaskParamsTests extends AbstractSerializingTestCase { + + @Override + protected StartDataFrameAnalyticsAction.TaskParams doParseInstance(XContentParser parser) throws IOException { + return StartDataFrameAnalyticsAction.TaskParams.fromXContent(parser); + } + + @Override + protected StartDataFrameAnalyticsAction.TaskParams createTestInstance() { + return new StartDataFrameAnalyticsAction.TaskParams(randomAlphaOfLength(10), Version.CURRENT); + } + + @Override + protected Writeable.Reader instanceReader() { + return StartDataFrameAnalyticsAction.TaskParams::new; + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsRequestTests.java index 9c61164c5f02a..f2942905fadf0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsRequestTests.java @@ -24,6 +24,9 @@ protected Request createTestInstance() { if (randomBoolean()) { request.setAllowNoMatch(randomBoolean()); } + if (randomBoolean()) { + request.setForce(randomBoolean()); + } int expandedIdsCount = randomIntBetween(0, 10); Set expandedIds = new HashSet<>(); for (int i = 0; i < expandedIdsCount; i++) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java new file mode 100644 index 0000000000000..1b0800c91e3f6 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.dataframe; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class DataFrameAnalyticsStateTests extends ESTestCase { + + public void testFromString() { + assertThat(DataFrameAnalyticsState.fromString("started"), equalTo(DataFrameAnalyticsState.STARTED)); + assertThat(DataFrameAnalyticsState.fromString("reindexing"), equalTo(DataFrameAnalyticsState.REINDEXING)); + assertThat(DataFrameAnalyticsState.fromString("analyzing"), equalTo(DataFrameAnalyticsState.ANALYZING)); + assertThat(DataFrameAnalyticsState.fromString("stopping"), equalTo(DataFrameAnalyticsState.STOPPING)); + assertThat(DataFrameAnalyticsState.fromString("stopped"), equalTo(DataFrameAnalyticsState.STOPPED)); + assertThat(DataFrameAnalyticsState.fromString("failed"), equalTo(DataFrameAnalyticsState.FAILED)); + } + + public void testToString() { + assertThat(DataFrameAnalyticsState.STARTED.toString(), equalTo("started")); + assertThat(DataFrameAnalyticsState.REINDEXING.toString(), equalTo("reindexing")); + assertThat(DataFrameAnalyticsState.ANALYZING.toString(), equalTo("analyzing")); + assertThat(DataFrameAnalyticsState.STOPPING.toString(), equalTo("stopping")); + assertThat(DataFrameAnalyticsState.STOPPED.toString(), equalTo("stopped")); + assertThat(DataFrameAnalyticsState.FAILED.toString(), equalTo("failed")); + } + + public void testIsAnyOf() { + assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(), is(false)); + assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.STARTED), is(true)); + assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.ANALYZING, DataFrameAnalyticsState.STOPPED), is(false)); + assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsState.STOPPED), is(true)); + assertThat(DataFrameAnalyticsState.ANALYZING.isAnyOf(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsState.STOPPED), is(false)); + assertThat(DataFrameAnalyticsState.ANALYZING.isAnyOf(DataFrameAnalyticsState.ANALYZING, DataFrameAnalyticsState.FAILED), is(true)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskStateTests.java new file mode 100644 index 0000000000000..f6d2d421b3d9a --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskStateTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.dataframe; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class DataFrameAnalyticsTaskStateTests extends AbstractSerializingTestCase { + + @Override + protected DataFrameAnalyticsTaskState createTestInstance() { + return new DataFrameAnalyticsTaskState(randomFrom(DataFrameAnalyticsState.values()), randomLong(), randomAlphaOfLength(10)); + } + + @Override + protected Writeable.Reader instanceReader() { + return DataFrameAnalyticsTaskState::new; + } + + @Override + protected DataFrameAnalyticsTaskState doParseInstance(XContentParser parser) throws IOException { + return DataFrameAnalyticsTaskState.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index caf5d0c17d2cb..3976a0e578f16 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -501,7 +501,7 @@ public Collection createComponents(Client client, ClusterService cluster new BlackHoleAutodetectProcess(job.getId()); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); - analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService) -> null; + analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService, onProcessCrash) -> null; } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 575069e4fd4dc..8d139ba9b6ca4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.Response.Stats; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessManager; @@ -178,6 +179,11 @@ private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concre PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData.PersistentTask analyticsTask = MlTasks.getDataFrameAnalyticsTask(concreteAnalyticsId, tasks); DataFrameAnalyticsState analyticsState = MlTasks.getDataFrameAnalyticsState(concreteAnalyticsId, tasks); + String failureReason = null; + if (analyticsState == DataFrameAnalyticsState.FAILED) { + DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) analyticsTask.getState(); + failureReason = taskState.getReason(); + } DiscoveryNode node = null; String assignmentExplanation = null; if (analyticsTask != null) { @@ -185,6 +191,6 @@ private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concre assignmentExplanation = analyticsTask.getAssignment().getExplanation(); } return new GetDataFrameAnalyticsStatsAction.Response.Stats( - concreteAnalyticsId, analyticsState, progressPercent, node, assignmentExplanation); + concreteAnalyticsId, analyticsState, failureReason, progressPercent, node, assignmentExplanation); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 5665bd0181210..1d878adbc1d11 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -62,6 +62,7 @@ import java.io.IOException; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; @@ -131,8 +132,6 @@ protected void masterOperation(Task task, StartDataFrameAnalyticsAction.Request return; } - StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams(request.getId()); - // Wait for analytics to be started ActionListener> waitForAnalyticsToStart = new ActionListener>() { @@ -151,17 +150,26 @@ public void onFailure(Exception e) { } }; + AtomicReference configHolder = new AtomicReference<>(); + // Start persistent task ActionListener memoryRequirementRefreshListener = ActionListener.wrap( - validated -> persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()), - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart), + aVoid -> { + StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams( + request.getId(), configHolder.get().getVersion()); + persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()), + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart); + }, listener::onFailure ); // Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks ActionListener configListener = ActionListener.wrap( - config -> memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers( - request.getId(), config.getModelMemoryLimit().getBytes(), memoryRequirementRefreshListener), + config -> { + configHolder.set(config); + memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers( + request.getId(), config.getModelMemoryLimit().getBytes(), memoryRequirementRefreshListener); + }, listener::onFailure ); @@ -251,7 +259,21 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTa } DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) persistentTask.getState(); DataFrameAnalyticsState analyticsState = taskState == null ? DataFrameAnalyticsState.STOPPED : taskState.getState(); - return analyticsState == DataFrameAnalyticsState.STARTED; + switch (analyticsState) { + case STARTED: + case REINDEXING: + case ANALYZING: + return true; + case STOPPING: + exception = ExceptionsHelper.conflictStatusException("the task has been stopped while waiting to be started"); + return true; + case STOPPED: + return false; + case FAILED: + default: + exception = ExceptionsHelper.serverError("Unexpected task state [" + analyticsState + "] while waiting to be started"); + return true; + } } } @@ -425,13 +447,15 @@ protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyti DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state; // If we are "stopping" there is nothing to do - if (analyticsTaskState != null && analyticsTaskState.getState() == DataFrameAnalyticsState.STOPPING) { + // If we are "failed" then we should leave the task as is; for recovery it must be force stopped. + if (analyticsTaskState != null && analyticsTaskState.getState().isAnyOf( + DataFrameAnalyticsState.STOPPING, DataFrameAnalyticsState.FAILED)) { return; } if (analyticsTaskState == null) { DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED, - task.getAllocationId()); + task.getAllocationId(), null); task.updatePersistentTaskState(startedState, ActionListener.wrap( response -> manager.execute((DataFrameAnalyticsTask) task, DataFrameAnalyticsState.STARTED), task::markAsFailed)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java index 7c8222d83f3e3..b02d89c17766c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -31,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; @@ -89,13 +91,10 @@ protected void doExecute(Task task, StopDataFrameAnalyticsAction.Request request return; } - Set startedAnalytics = new HashSet<>(); - Set stoppingAnalytics = new HashSet<>(); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - sortAnalyticsByTaskState(expandedIds, tasks, startedAnalytics, stoppingAnalytics); - - request.setExpandedIds(startedAnalytics); - request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(startedAnalytics, tasks)); + Set analyticsToStop = findAnalyticsToStop(tasks, expandedIds, request.isForce()); + request.setExpandedIds(analyticsToStop); + request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(analyticsToStop, tasks)); ActionListener finalListener = ActionListener.wrap( r -> waitForTaskRemoved(expandedIds, request, r, listener), @@ -110,8 +109,28 @@ protected void doExecute(Task task, StopDataFrameAnalyticsAction.Request request expandIds(state, request, expandedIdsListener); } + /** Visible for testing */ + static Set findAnalyticsToStop(PersistentTasksCustomMetaData tasks, Set ids, boolean force) { + Set startedAnalytics = new HashSet<>(); + Set stoppingAnalytics = new HashSet<>(); + Set failedAnalytics = new HashSet<>(); + sortAnalyticsByTaskState(ids, tasks, startedAnalytics, stoppingAnalytics, failedAnalytics); + + if (force == false && failedAnalytics.isEmpty() == false) { + ElasticsearchStatusException e = failedAnalytics.size() == 1 ? ExceptionsHelper.conflictStatusException( + "cannot close data frame analytics [{}] because it failed, use force stop instead", failedAnalytics.iterator().next()) : + ExceptionsHelper.conflictStatusException("one or more data frame analytics are in failed state, " + + "use force stop instead"); + throw e; + } + + startedAnalytics.addAll(failedAnalytics); + return startedAnalytics; + } + private static void sortAnalyticsByTaskState(Set analyticsIds, PersistentTasksCustomMetaData tasks, - Set startedAnalytics, Set stoppingAnalytics) { + Set startedAnalytics, Set stoppingAnalytics, + Set failedAnalytics) { for (String analyticsId : analyticsIds) { switch (MlTasks.getDataFrameAnalyticsState(analyticsId, tasks)) { case STARTED: @@ -124,6 +143,9 @@ private static void sortAnalyticsByTaskState(Set analyticsIds, Persisten break; case STOPPED: break; + case FAILED: + failedAnalytics.add(analyticsId); + break; default: break; } @@ -203,7 +225,7 @@ protected void taskOperation(StopDataFrameAnalyticsAction.Request request, TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task, ActionListener listener) { DataFrameAnalyticsTaskState stoppingState = - new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STOPPING, task.getAllocationId()); + new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STOPPING, task.getAllocationId(), null); task.updatePersistentTaskState(stoppingState, ActionListener.wrap(pTask -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override @@ -213,6 +235,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { + logger.info("[{}] Stopping task with force [{}]", task.getParams().getId(), request.isForce()); task.stop("stop_data_frame_analytics (api)", request.getTimeout()); listener.onResponse(new StopDataFrameAnalyticsAction.Response(true)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 9132e0f8192d7..28f277dc84bcf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -70,7 +70,7 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current ActionListener configListener = ActionListener.wrap( config -> { DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING, - task.getAllocationId()); + task.getAllocationId(), null); switch(currentState) { // If we are STARTED, we are right at the beginning of our task, we should indicate that we are entering the // REINDEX state and start reindexing. @@ -191,7 +191,7 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi ActionListener dataExtractorFactoryListener = ActionListener.wrap( dataExtractorFactory -> { DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING, - task.getAllocationId()); + task.getAllocationId(), null); task.updatePersistentTaskState(analyzingState, ActionListener.wrap( updatedTask -> processManager.runJob(task, config, dataExtractorFactory, error -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java index d09757ddc5c74..1df29b88ba4e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.dataframe.process; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; public interface AnalyticsProcessFactory { @@ -15,7 +16,9 @@ public interface AnalyticsProcessFactory { * @param jobId The job id * @param analyticsProcessConfig The process configuration * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process + * @param onProcessCrash Callback to execute if the process stops unexpectedly * @return The process */ - AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService); + AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService, + Consumer onProcessCrash); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index c1447f4d18b42..bb54895b2fa97 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.client.Client; @@ -15,9 +16,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; @@ -46,7 +49,7 @@ public AnalyticsProcessManager(Client client, ThreadPool threadPool, AnalyticsPr this.processFactory = Objects.requireNonNull(analyticsProcessFactory); } - public void runJob(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, + public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory, Consumer finishHandler) { threadPool.generic().execute(() -> { if (task.isStopping()) { @@ -61,10 +64,10 @@ public void runJob(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask + "] Could not create process as one already exists")); return; } - if (processContext.startProcess(dataExtractorFactory, config)) { + if (processContext.startProcess(dataExtractorFactory, config, task)) { ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); - executorService.execute(() -> processContext.resultProcessor.process(processContext.process)); - executorService.execute(() -> processData(task.getAllocationId(), config, processContext.dataExtractor, + executorService.execute(() -> processResults(processContext)); + executorService.execute(() -> processData(task, config, processContext.dataExtractor, processContext.process, processContext.resultProcessor, finishHandler)); } else { finishHandler.accept(null); @@ -72,8 +75,17 @@ public void runJob(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask }); } - private void processData(long taskAllocationId, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, + private void processResults(ProcessContext processContext) { + try { + processContext.resultProcessor.process(processContext.process); + } catch (Exception e) { + processContext.setFailureReason(e.getMessage()); + } + } + + private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, AnalyticsProcess process, AnalyticsResultProcessor resultProcessor, Consumer finishHandler) { + try { writeHeaderRecord(dataExtractor, process); writeDataRows(dataExtractor, process); @@ -82,26 +94,28 @@ private void processData(long taskAllocationId, DataFrameAnalyticsConfig config, LOGGER.info("[{}] Waiting for result processor to complete", config.getId()); resultProcessor.awaitForCompletion(); + processContextByAllocation.get(task.getAllocationId()).setFailureReason(resultProcessor.getFailure()); + refreshDest(config); LOGGER.info("[{}] Result processor has completed", config.getId()); - } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", config.getId()), e); - // TODO Handle this failure by setting the task state to FAILED + } catch (Exception e) { + String errorMsg = new ParameterizedMessage("[{}] Error while processing data", config.getId()).getFormattedMessage(); + processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); } finally { - LOGGER.info("[{}] Closing process", config.getId()); - try { - process.close(); - LOGGER.info("[{}] Closed process", config.getId()); + closeProcess(task); + ProcessContext processContext = processContextByAllocation.remove(task.getAllocationId()); + LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(), + processContextByAllocation.size()); + + if (processContext.getFailureReason() == null) { // This results in marking the persistent task as complete + LOGGER.info("[{}] Marking task completed", config.getId()); finishHandler.accept(null); - } catch (IOException e) { - LOGGER.error("[{}] Error closing data frame analyzer process", config.getId()); - finishHandler.accept(e); + } else { + LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason()); + updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); } - processContextByAllocation.remove(taskAllocationId); - LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(), - processContextByAllocation.size()); } } @@ -142,15 +156,34 @@ private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsPr process.writeRecord(headerRecord); } - private AnalyticsProcess createProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig) { + private AnalyticsProcess createProcess(DataFrameAnalyticsTask task, AnalyticsProcessConfig analyticsProcessConfig) { ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); - AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, analyticsProcessConfig, executorService); + AnalyticsProcess process = processFactory.createAnalyticsProcess(task.getParams().getId(), analyticsProcessConfig, + executorService, onProcessCrash(task)); if (process.isProcessAlive() == false) { throw ExceptionsHelper.serverError("Failed to start data frame analytics process"); } return process; } + private Consumer onProcessCrash(DataFrameAnalyticsTask task) { + return reason -> { + ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); + if (processContext != null) { + processContext.setFailureReason(reason); + processContext.stop(); + } + }; + } + + private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) { + DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason); + task.updatePersistentTaskState(newTaskState, ActionListener.wrap( + updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state), + e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e) + )); + } + @Nullable public Integer getProgressPercent(long allocationId) { ProcessContext processContext = processContextByAllocation.get(allocationId); @@ -162,13 +195,29 @@ private void refreshDest(DataFrameAnalyticsConfig config) { () -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet()); } - public void stop(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task) { + private void closeProcess(DataFrameAnalyticsTask task) { + String configId = task.getParams().getId(); + LOGGER.info("[{}] Closing process", configId); + + ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); + try { + processContext.process.close(); + LOGGER.info("[{}] Closed process", configId); + } catch (IOException e) { + String errorMsg = new ParameterizedMessage("[{}] Error closing data frame analyzer process [{}]" + , configId, e.getMessage()).getFormattedMessage(); + processContext.setFailureReason(errorMsg); + } + } + + public void stop(DataFrameAnalyticsTask task) { ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); if (processContext != null) { LOGGER.debug("[{}] Stopping process", task.getParams().getId() ); processContext.stop(); } else { LOGGER.debug("[{}] No process context to stop", task.getParams().getId() ); + task.markAsCompleted(); } } @@ -180,6 +229,7 @@ class ProcessContext { private volatile AnalyticsResultProcessor resultProcessor; private final AtomicInteger progressPercent = new AtomicInteger(0); private volatile boolean processKilled; + private volatile String failureReason; ProcessContext(String id) { this.id = Objects.requireNonNull(id); @@ -197,6 +247,17 @@ void setProgressPercent(int progressPercent) { this.progressPercent.set(progressPercent); } + private synchronized void setFailureReason(String failureReason) { + // Only set the new reason if there isn't one already as we want to keep the first reason + if (failureReason != null) { + this.failureReason = failureReason; + } + } + + private String getFailureReason() { + return failureReason; + } + public synchronized void stop() { LOGGER.debug("[{}] Stopping process", id); processKilled = true; @@ -215,14 +276,15 @@ public synchronized void stop() { /** * @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime */ - private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config) { + private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config, + DataFrameAnalyticsTask task) { if (processKilled) { // The job was stopped before we started the process so no need to start it return false; } dataExtractor = dataExtractorFactory.newExtractor(false); - process = createProcess(config.getId(), createProcessConfig(config, dataExtractor)); + process = createProcess(task, createProcessConfig(config, dataExtractor)); DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client, dataExtractorFactory.newExtractor(true)); resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, this::setProgressPercent); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index f9b131393541a..11c451e9c3932 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Nullable; import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; import java.util.Iterator; @@ -26,6 +27,7 @@ public class AnalyticsResultProcessor { private final Supplier isProcessKilled; private final Consumer progressConsumer; private final CountDownLatch completionLatch = new CountDownLatch(1); + private volatile String failure; public AnalyticsResultProcessor(String dataFrameAnalyticsId, DataFrameRowsJoiner dataFrameRowsJoiner, Supplier isProcessKilled, Consumer progressConsumer) { @@ -35,6 +37,11 @@ public AnalyticsResultProcessor(String dataFrameAnalyticsId, DataFrameRowsJoiner this.progressConsumer = Objects.requireNonNull(progressConsumer); } + @Nullable + public String getFailure() { + return failure == null ? dataFrameRowsJoiner.getFailure() : failure; + } + public void awaitForCompletion() { try { if (completionLatch.await(30, TimeUnit.MINUTES) == false) { @@ -59,6 +66,7 @@ public void process(AnalyticsProcess process) { // No need to log error as it's due to stopping } else { LOGGER.error(new ParameterizedMessage("[{}] Error parsing data frame analytics output", dataFrameAnalyticsId), e); + failure = "error parsing data frame analytics output: [" + e.getMessage() + "]"; } } finally { completionLatch.countDown(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java index ef943820374ea..1b3dd2932ab9e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -41,7 +42,7 @@ class DataFrameRowsJoiner implements AutoCloseable { private final DataFrameDataExtractor dataExtractor; private final Iterator dataFrameRowsIterator; private LinkedList currentResults; - private boolean failed; + private volatile String failure; DataFrameRowsJoiner(String analyticsId, Client client, DataFrameDataExtractor dataExtractor) { this.analyticsId = Objects.requireNonNull(analyticsId); @@ -51,8 +52,13 @@ class DataFrameRowsJoiner implements AutoCloseable { this.currentResults = new LinkedList<>(); } + @Nullable + String getFailure() { + return failure; + } + void processRowResults(RowResults rowResults) { - if (failed) { + if (failure != null) { // If we are in failed state we drop the results but we let the processor // parse the output return; @@ -61,8 +67,8 @@ void processRowResults(RowResults rowResults) { try { addResultAndJoinIfEndOfBatch(rowResults); } catch (Exception e) { - LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", analyticsId), e); - failed = true; + LOGGER.error(new ParameterizedMessage("[{}] Failed to join results ", analyticsId), e); + failure = "[" + analyticsId + "] Failed to join results: " + e.getMessage(); } } @@ -93,8 +99,7 @@ private void checkChecksumsMatch(DataFrameDataExtractor.Row row, RowResults resu msg += "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; "; msg += "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. "; msg += "We rely on this index being immutable during a running analysis and so the results will be unreliable."; - throw new RuntimeException(msg); - // TODO Communicate this error to the user as effectively the analytics have failed (e.g. FAILED state, audit error, etc.) + throw ExceptionsHelper.serverError(msg); } } @@ -112,8 +117,7 @@ private void executeBulkRequest(BulkRequest bulkRequest) { BulkResponse bulkResponse = ClientHelper.executeWithHeaders(dataExtractor.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> client.execute(BulkAction.INSTANCE, bulkRequest).actionGet()); if (bulkResponse.hasFailures()) { - LOGGER.error("Failures while writing data frame"); - // TODO Better error handling + throw ExceptionsHelper.serverError("failures while writing results [" + bulkResponse.buildFailureMessage() + "]"); } } @@ -123,7 +127,7 @@ public void close() { joinCurrentResults(); } catch (Exception e) { LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", analyticsId), e); - failed = true; + failure = "[" + analyticsId + "] Failed to join results: " + e.getMessage(); } finally { try { consumeDataExtractor(); @@ -159,7 +163,7 @@ public DataFrameDataExtractor.Row next() { } if (row == null || row.shouldSkip()) { - throw ExceptionsHelper.serverError("No more data frame rows could be found while joining results"); + throw ExceptionsHelper.serverError("no more data frame rows could be found while joining results"); } return row; } @@ -175,9 +179,7 @@ private Optional> getNextDataRowsBatch() { try { return dataExtractor.next(); } catch (IOException e) { - // TODO Implement recovery strategy or better error reporting - LOGGER.error("Error reading next batch of data frame rows", e); - return Optional.empty(); + throw ExceptionsHelper.serverError("error reading next batch of data frame rows [" + e.getMessage() + "]"); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index e5554e3dcd01a..4f765e68b73f4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { @@ -50,7 +51,7 @@ void setProcessConnectTimeout(TimeValue processConnectTimeout) { @Override public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, - ExecutorService executorService) { + ExecutorService executorService, Consumer onProcessCrash) { List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, true, false, true, true, false, false); @@ -62,8 +63,7 @@ public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessCon NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, numberOfFields, - filesToDelete, reason -> {}); - + filesToDelete, onProcessCrash); try { analyticsProcess.start(executorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index b22ad9482e443..aa97c13b21d69 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -265,8 +265,9 @@ private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTa MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedAnalyticsTasks) { DataFrameAnalyticsState dataFrameAnalyticsState = ((DataFrameAnalyticsTaskState) assignedTask.getState()).getState(); - // TODO: skip FAILED here too if such a state is ever added - if (dataFrameAnalyticsState != DataFrameAnalyticsState.STOPPED) { + + // Don't count stopped and failed df-analytics tasks as they don't consume native memory + if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) { // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED // and REINDEXING states we're committed to using the memory soon, so account for it here ++result.numberOfAssignedJobs; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestStopDataFrameAnalyticsAction.java index 8a399c736c92e..65f1d40273540 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestStopDataFrameAnalyticsAction.java @@ -7,7 +7,6 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; @@ -39,15 +38,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient request = StopDataFrameAnalyticsAction.Request.parseRequest(id, restRequest.contentOrSourceParamParser()); } else { request = new StopDataFrameAnalyticsAction.Request(id); - if (restRequest.hasParam(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName())) { - TimeValue timeout = restRequest.paramAsTime(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), - request.getTimeout()); - request.setTimeout(timeout); - } - if (restRequest.hasParam(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName())) { - request.setAllowNoMatch(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName(), - request.allowNoMatch())); - } + request.setTimeout(restRequest.paramAsTime(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), + request.getTimeout())); + request.setAllowNoMatch(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName(), + request.allowNoMatch())); + request.setForce(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce())); } return channel -> client.execute(StopDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java new file mode 100644 index 0000000000000..2f28463c5206a --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase { + + public void testFindAnalyticsToStop_GivenOneFailedTaskAndNotForce() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED); + addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING); + addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING); + addAnalyticsTask(tasksBuilder, "stopping", "foo-node", DataFrameAnalyticsState.STOPPING); + addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED); + addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED); + + Set ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed")); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false)); + + assertThat(e.status(), equalTo(RestStatus.CONFLICT)); + assertThat(e.getMessage(), equalTo("cannot close data frame analytics [failed] because it failed, use force stop instead")); + } + + public void testFindAnalyticsToStop_GivenTwoFailedTasksAndNotForce() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED); + addAnalyticsTask(tasksBuilder, "another_failed", "foo-node", DataFrameAnalyticsState.FAILED); + + Set ids = new HashSet<>(Arrays.asList("failed", "another_failed")); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false)); + + assertThat(e.status(), equalTo(RestStatus.CONFLICT)); + assertThat(e.getMessage(), equalTo("one or more data frame analytics are in failed state, use force stop instead")); + } + + public void testFindAnalyticsToStop_GivenFailedTaskAndForce() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED); + addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING); + addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING); + addAnalyticsTask(tasksBuilder, "stopping", "foo-node", DataFrameAnalyticsState.STOPPING); + addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED); + addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED); + + Set ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed")); + + Set analyticsToStop = TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, true); + + assertThat(analyticsToStop, containsInAnyOrder("started", "reindexing", "analyzing", "failed")); + } + + private static void addAnalyticsTask(PersistentTasksCustomMetaData.Builder builder, String analyticsId, String nodeId, + DataFrameAnalyticsState state) { + builder.addTask(MlTasks.dataFrameAnalyticsTaskId(analyticsId), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, + new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT), + new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); + + builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(analyticsId), + new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId(), null)); + + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index ad69cbc972993..f0388abd5d5fd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -566,10 +566,11 @@ static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnal static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state, PersistentTasksCustomMetaData.Builder builder, boolean isStale) { builder.addTask(MlTasks.dataFrameAnalyticsTaskId(id), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, - new StartDataFrameAnalyticsAction.TaskParams(id), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); + new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT), + new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); if (state != null) { builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(id), - new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId() - (isStale ? 1 : 0))); + new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId() - (isStale ? 1 : 0), null)); } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 1dea073123ad2..429575902b0b8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.process; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; @@ -199,7 +200,7 @@ private PersistentTasksCustomMetaData.PersistentTask ma private PersistentTasksCustomMetaData.PersistentTask makeTestDataFrameAnalyticsTask(String id) { return new PersistentTasksCustomMetaData.PersistentTask<>(MlTasks.dataFrameAnalyticsTaskId(id), - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, new StartDataFrameAnalyticsAction.TaskParams(id), 0, + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT), 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json index 962e4e391a045..2aca8f52fafe8 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json @@ -18,6 +18,11 @@ "required": false, "description": "Whether to ignore if a wildcard expression matches no data frame analytics. (This includes `_all` string or when no data frame analytics have been specified)" }, + "force": { + "type": "boolean", + "required": false, + "description": "True if the data frame analytics should be forcefully closed" + }, "timeout": { "type": "time", "required": false, From 1bca37ce73672ecf0639b81a016466ed0e68e554 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 2 Jul 2019 18:33:55 +0300 Subject: [PATCH 2/3] Address first review comments --- .../ml/dataframe/DataFrameAnalyticsStats.java | 24 ++++++++++++++----- .../client/MachineLearningIT.java | 1 + .../DataFrameAnalyticsStatsTests.java | 4 ++++ .../api/ml.stop_data_frame_analytics.json | 2 +- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java index 5c652f33edb2e..4e04204e65021 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java @@ -41,6 +41,7 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws static final ParseField ID = new ParseField("id"); static final ParseField STATE = new ParseField("state"); + static final ParseField FAILURE_REASON = new ParseField("failure_reason"); static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent"); static final ParseField NODE = new ParseField("node"); static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation"); @@ -50,9 +51,10 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws args -> new DataFrameAnalyticsStats( (String) args[0], (DataFrameAnalyticsState) args[1], - (Integer) args[2], - (NodeAttributes) args[3], - (String) args[4])); + (String) args[2], + (Integer) args[3], + (NodeAttributes) args[4], + (String) args[5])); static { PARSER.declareString(constructorArg(), ID); @@ -62,6 +64,7 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws } throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, STATE, ObjectParser.ValueType.STRING); + PARSER.declareString(optionalConstructorArg(), FAILURE_REASON); PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT); PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE); PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION); @@ -69,14 +72,17 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws private final String id; private final DataFrameAnalyticsState state; + private final String failureReason; private final Integer progressPercent; private final NodeAttributes node; private final String assignmentExplanation; - public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercent, - @Nullable NodeAttributes node, @Nullable String assignmentExplanation) { + public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, + @Nullable Integer progressPercent, @Nullable NodeAttributes node, + @Nullable String assignmentExplanation) { this.id = id; this.state = state; + this.failureReason = failureReason; this.progressPercent = progressPercent; this.node = node; this.assignmentExplanation = assignmentExplanation; @@ -90,6 +96,10 @@ public DataFrameAnalyticsState getState() { return state; } + public String getFailureReason() { + return failureReason; + } + public Integer getProgressPercent() { return progressPercent; } @@ -110,6 +120,7 @@ public boolean equals(Object o) { DataFrameAnalyticsStats other = (DataFrameAnalyticsStats) o; return Objects.equals(id, other.id) && Objects.equals(state, other.state) + && Objects.equals(failureReason, other.failureReason) && Objects.equals(progressPercent, other.progressPercent) && Objects.equals(node, other.node) && Objects.equals(assignmentExplanation, other.assignmentExplanation); @@ -117,7 +128,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(id, state, progressPercent, node, assignmentExplanation); + return Objects.hash(id, state, failureReason, progressPercent, node, assignmentExplanation); } @Override @@ -125,6 +136,7 @@ public String toString() { return new ToStringBuilder(getClass()) .add("id", id) .add("state", state) + .add("failureReason", failureReason) .add("progressPercent", progressPercent) .add("node", node) .add("assignmentExplanation", assignmentExplanation) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 77efe43b2e174..e44883823c271 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -1359,6 +1359,7 @@ public void testGetDataFrameAnalyticsStats() throws Exception { DataFrameAnalyticsStats stats = statsResponse.getAnalyticsStats().get(0); assertThat(stats.getId(), equalTo(configId)); assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED)); + assertNull(stats.getFailureReason()); assertNull(stats.getProgressPercent()); assertNull(stats.getNode()); assertNull(stats.getAssignmentExplanation()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java index ed6e24f754d19..fad02eac161c7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java @@ -43,6 +43,7 @@ public static DataFrameAnalyticsStats randomDataFrameAnalyticsStats() { return new DataFrameAnalyticsStats( randomAlphaOfLengthBetween(1, 10), randomFrom(DataFrameAnalyticsState.values()), + randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomIntBetween(0, 100), randomBoolean() ? null : NodeAttributesTests.createRandom(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)); @@ -52,6 +53,9 @@ public static void toXContent(DataFrameAnalyticsStats stats, XContentBuilder bui builder.startObject(); builder.field(DataFrameAnalyticsStats.ID.getPreferredName(), stats.getId()); builder.field(DataFrameAnalyticsStats.STATE.getPreferredName(), stats.getState().value()); + if (stats.getFailureReason() != null) { + builder.field(DataFrameAnalyticsStats.FAILURE_REASON.getPreferredName(), stats.getFailureReason()); + } if (stats.getProgressPercent() != null) { builder.field(DataFrameAnalyticsStats.PROGRESS_PERCENT.getPreferredName(), stats.getProgressPercent()); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json index 2aca8f52fafe8..ee1a9c85be424 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json @@ -21,7 +21,7 @@ "force": { "type": "boolean", "required": false, - "description": "True if the data frame analytics should be forcefully closed" + "description": "True if the data frame analytics should be forcefully stopped" }, "timeout": { "type": "time", From 2ad2d8bfb8f030f395ac8fb318b980dd993eb2b2 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 2 Jul 2019 19:56:36 +0300 Subject: [PATCH 3/3] Fix HLRC doc issue with reference to force --- .../client/documentation/MlClientDocumentationIT.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 7c5f0b8ccf6bf..93f196212ca0c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -3110,11 +3110,8 @@ public void testStopDataFrameAnalytics() throws Exception { { // tag::stop-data-frame-analytics-request StopDataFrameAnalyticsRequest request = new StopDataFrameAnalyticsRequest("my-analytics-config"); // <1> - // end::stop-data-frame-analytics-request - - //tag::stop-data-frame-analytics-request-force request.setForce(false); // <2> - //end::stop-data-frame-analytics-request-force + // end::stop-data-frame-analytics-request // tag::stop-data-frame-analytics-execute StopDataFrameAnalyticsResponse response = client.machineLearning().stopDataFrameAnalytics(request, RequestOptions.DEFAULT);