diff --git a/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc index 3d425879adde6..f2e6b2f5dbdbb 100644 --- a/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc @@ -59,6 +59,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=exclude-generated] The API returns an array of {anomaly-job} resources. For the full list of properties, see <>. +//Begin blocked +`blocked`:: +(object) When present, it explains that a task is executed on the job +that blocks it from opening. ++ +.Properties of `blocked` +[%collapsible%open] +==== +`reason`::: +(string) The reason the job is blocked. Values may be `delete`, `reset`, `revert`. +Each value means the corresponding action is being executed. + +`task_id`::: +(string) The task id of the blocking action. You can use the <> API to +monitor progress. +==== +//End blocked + `create_time`:: (string) The time the job was created. For example, `1491007356077`. This property is informational; you cannot change its value. diff --git a/docs/reference/ml/anomaly-detection/apis/index.asciidoc b/docs/reference/ml/anomaly-detection/apis/index.asciidoc index eebae76f9b9cf..750457471662d 100644 --- a/docs/reference/ml/anomaly-detection/apis/index.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/index.asciidoc @@ -46,6 +46,8 @@ include::open-job.asciidoc[leveloffset=+2] include::post-data.asciidoc[leveloffset=+2] //PREVIEW include::preview-datafeed.asciidoc[leveloffset=+2] +//RESET +include::reset-job.asciidoc[leveloffset=+2] //REVERT include::revert-snapshot.asciidoc[leveloffset=+2] //SET/START/STOP diff --git a/docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc b/docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc index 8c48a4a0e5696..4143eea7d5fe2 100644 --- a/docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc @@ -19,6 +19,7 @@ See also <>. * <> * <> * <> or <> +* <> [discrete] diff --git a/docs/reference/ml/anomaly-detection/apis/reset-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/reset-job.asciidoc new file mode 100644 index 0000000000000..204e80321c2e7 --- /dev/null +++ b/docs/reference/ml/anomaly-detection/apis/reset-job.asciidoc @@ -0,0 +1,86 @@ +[role="xpack"] +[testenv="platinum"] +[[ml-reset-job]] += Reset {anomaly-jobs} API +++++ +Reset jobs +++++ + +Resets an existing {anomaly-job}. + +[[ml-reset-job-request]] +== {api-request-title} + +`POST _ml/anomaly_detectors//_reset` + +[[ml-reset-job-prereqs]] +== {api-prereq-title} + +* Requires the `manage_ml` cluster privilege. This privilege is included in the +`machine_learning_admin` built-in role. +* Before you can reset a job, you must close it. You can set `force` to `true` +when closing the job to avoid waiting for the job to complete. See +<>. + +[[ml-reset-job-desc]] +== {api-description-title} + +All model state and results are deleted. +The job is ready to start over as if it had just been created. + +It is not currently possible to reset multiple jobs using wildcards or a comma +separated list. + +[[ml-reset-job-path-parms]] +== {api-path-parms-title} + +``:: +(Required, string) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection] + +[[ml-reset-job-query-parms]] +== {api-query-parms-title} + +`wait_for_completion`:: + (Optional, Boolean) Specifies whether the request should return immediately or + wait until the job reset completes. Defaults to `true`. + +[[ml-reset-job-example]] +== {api-examples-title} + +[source,console] +-------------------------------------------------- +POST _ml/anomaly_detectors/total-requests/_reset +-------------------------------------------------- +// TEST[skip:setup:server_metrics_job] + +When the job is reset, you receive the following results: + +[source,console-result] +---- +{ + "acknowledged": true +} +---- + +In the next example we reset the `total-requests` job asynchronously: + +[source,console] +-------------------------------------------------- +POST _ml/anomaly_detectors/total-requests/_reset?wait_for_completion=false +-------------------------------------------------- +// TEST[skip:setup:server_metrics_job] + +When `wait_for_completion` is set to `false`, the response contains the id +of the job reset task: + +[source,console-result] +---- +{ + "task": "oTUltX4IQMOUUVeiohTt8A:39" +} +---- +// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/] + +If you want to check the status of the reset task, use the <> by referencing +the task ID. diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/ml.reset_job.json b/rest-api-spec/src/main/resources/rest-api-spec/api/ml.reset_job.json new file mode 100644 index 0000000000000..75d2ae410e803 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/ml.reset_job.json @@ -0,0 +1,36 @@ +{ + "ml.reset_job":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-reset-job.html", + "description":"Resets an existing anomaly detection job." + }, + "stability":"stable", + "visibility":"public", + "headers":{ + "accept": [ "application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_ml/anomaly_detectors/{job_id}/_reset", + "methods":[ + "POST" + ], + "parts":{ + "job_id":{ + "type":"string", + "description":"The ID of the job to reset" + } + } + } + ] + }, + "params":{ + "wait_for_completion":{ + "type":"boolean", + "description":"Should this request wait until the operation has completed before returning", + "default":true + } + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetJobAction.java new file mode 100644 index 0000000000000..a6dcd1636985b --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetJobAction.java @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class ResetJobAction extends ActionType { + + public static final String NAME = "cluster:admin/xpack/ml/job/reset"; + public static final ResetJobAction INSTANCE = new ResetJobAction(); + + public static final Version VERSION_INTRODUCED = Version.V_7_14_0; + + private ResetJobAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static class Request extends AcknowledgedRequest { + + private String jobId; + + /** + * Internal parameter that allows resetting an open job + * when a job is reallocated to a new node. + */ + private boolean skipJobStateValidation; + + /** + * Should this task store its result? + */ + private boolean shouldStoreResult; + + public Request(String jobId) { + this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID); + } + + public Request(StreamInput in) throws IOException { + super(in); + jobId = in.readString(); + skipJobStateValidation = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(jobId); + out.writeBoolean(skipJobStateValidation); + } + + public void setSkipJobStateValidation(boolean skipJobStateValidation) { + this.skipJobStateValidation = skipJobStateValidation; + } + + public boolean isSkipJobStateValidation() { + return skipJobStateValidation; + } + + /** + * Should this task store its result after it has finished? + */ + public void setShouldStoreResult(boolean shouldStoreResult) { + this.shouldStoreResult = shouldStoreResult; + } + + @Override + public boolean getShouldStoreResult() { + return shouldStoreResult; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, MlTasks.JOB_TASK_ID_PREFIX + jobId, parentTaskId, headers); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getJobId() { + return jobId; + } + + @Override + public int hashCode() { + return Objects.hash(jobId, skipJobStateValidation); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || o.getClass() != getClass()) return false; + Request that = (Request) o; + return Objects.equals(jobId, that.jobId) && skipJobStateValidation == that.skipJobStateValidation; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Blocked.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Blocked.java new file mode 100644 index 0000000000000..757168f351cf0 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Blocked.java @@ -0,0 +1,111 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.job.config; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ParseField; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Locale; +import java.util.Objects; + +public class Blocked implements ToXContentObject, Writeable { + + public enum Reason { + NONE, DELETE, RESET, REVERT; + + public static Reason fromString(String value) { + return Reason.valueOf(value.toUpperCase(Locale.ROOT)); + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } + + public static final ParseField REASON = new ParseField("reason"); + public static final ParseField TASK_ID = new ParseField("task_id"); + + public static final ConstructingObjectParser LENIENT_PARSER = createParser(true); + public static final ConstructingObjectParser STRICT_PARSER = createParser(false); + + private static ConstructingObjectParser createParser(boolean ignoreUnknownFields) { + ConstructingObjectParser parser = new ConstructingObjectParser<>("blocked", ignoreUnknownFields, + a -> new Blocked((Reason) a[0], (TaskId) a[1])); + parser.declareString(ConstructingObjectParser.constructorArg(), Reason::fromString, REASON); + parser.declareString(ConstructingObjectParser.optionalConstructorArg(), TaskId::new, TASK_ID); + return parser; + } + + private final Reason reason; + + @Nullable + private final TaskId taskId; + + public Blocked(Reason reason, @Nullable TaskId taskId) { + this.reason = Objects.requireNonNull(reason); + this.taskId = taskId; + } + + public Blocked(StreamInput in) throws IOException { + this.reason = in.readEnum(Reason.class); + this.taskId = in.readOptionalWriteable(TaskId::readFromStream); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeEnum(reason); + out.writeOptionalWriteable(taskId); + } + + public Reason getReason() { + return reason; + } + + @Nullable + public TaskId getTaskId() { + return taskId; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(REASON.getPreferredName(), reason); + if (taskId != null) { + builder.field(TASK_ID.getPreferredName(), taskId.toString()); + } + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(reason, taskId); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Blocked that = (Blocked) o; + return Objects.equals(reason, that.reason) && Objects.equals(taskId, that.taskId); + } + + public static Blocked none() { + return new Blocked(Reason.NONE, null); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 3284b73ef9b63..9eec0417b99f3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -83,6 +83,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name"); public static final ParseField DELETING = new ParseField("deleting"); public static final ParseField ALLOW_LAZY_OPEN = new ParseField("allow_lazy_open"); + public static final ParseField BLOCKED = new ParseField("blocked"); // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("jobs"); @@ -136,6 +137,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME); parser.declareBoolean(Builder::setDeleting, DELETING); parser.declareBoolean(Builder::setAllowLazyOpen, ALLOW_LAZY_OPEN); + parser.declareObject(Builder::setBlocked, ignoreUnknownFields ? Blocked.LENIENT_PARSER : Blocked.STRICT_PARSER, BLOCKED); return parser; } @@ -170,6 +172,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final String resultsIndexName; private final boolean deleting; private final boolean allowLazyOpen; + private final Blocked blocked; private Job(String jobId, String jobType, Version jobVersion, List groups, String description, Date createTime, Date finishedTime, @@ -177,7 +180,7 @@ private Job(String jobId, String jobType, Version jobVersion, List group ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long dailyModelSnapshotRetentionAfterDays, Long resultsRetentionDays, Map customSettings, String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, - boolean deleting, boolean allowLazyOpen) { + boolean deleting, boolean allowLazyOpen, Blocked blocked) { this.jobId = jobId; this.jobType = jobType; @@ -199,8 +202,19 @@ private Job(String jobId, String jobType, Version jobVersion, List group this.modelSnapshotId = modelSnapshotId; this.modelSnapshotMinVersion = modelSnapshotMinVersion; this.resultsIndexName = resultsIndexName; - this.deleting = deleting; this.allowLazyOpen = allowLazyOpen; + + if (deleting == false && blocked.getReason() == Blocked.Reason.DELETE) { + this.deleting = true; + } else { + this.deleting = deleting; + } + + if (deleting && blocked.getReason() != Blocked.Reason.DELETE) { + this.blocked = new Blocked(Blocked.Reason.DELETE, null); + } else { + this.blocked = blocked; + } } public Job(StreamInput in) throws IOException { @@ -231,6 +245,11 @@ public Job(StreamInput in) throws IOException { resultsIndexName = in.readString(); deleting = in.readBoolean(); allowLazyOpen = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + blocked = new Blocked(in); + } else { + blocked = deleting ? new Blocked(Blocked.Reason.DELETE, null) : Blocked.none(); + } } /** @@ -416,6 +435,10 @@ public boolean allowLazyOpen() { return allowLazyOpen; } + public Blocked getBlocked() { + return blocked; + } + /** * Get all input data fields mentioned in the job configuration, * namely analysis fields and the time field. @@ -503,6 +526,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(resultsIndexName); out.writeBoolean(deleting); out.writeBoolean(allowLazyOpen); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + blocked.writeTo(out); + } } @Override @@ -578,6 +604,9 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th } builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen); + if (blocked.getReason() != Blocked.Reason.NONE) { + builder.field(BLOCKED.getPreferredName(), blocked); + } return builder; } @@ -613,7 +642,8 @@ public boolean equals(Object other) { && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) && Objects.equals(this.deleting, that.deleting) - && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); + && Objects.equals(this.allowLazyOpen, that.allowLazyOpen) + && Objects.equals(this.blocked, that.blocked); } @Override @@ -621,7 +651,7 @@ public int hashCode() { return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays, - customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen); + customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen, blocked); } // Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString() @@ -671,6 +701,7 @@ public static class Builder implements Writeable { private String resultsIndexName; private boolean deleting; private boolean allowLazyOpen; + private Blocked blocked = Blocked.none(); public Builder() { } @@ -702,6 +733,7 @@ public Builder(Job job) { this.resultsIndexName = job.getResultsIndexNameNoPrefix(); this.deleting = job.isDeleting(); this.allowLazyOpen = job.allowLazyOpen(); + this.blocked = job.getBlocked(); } public Builder(StreamInput in) throws IOException { @@ -731,6 +763,11 @@ public Builder(StreamInput in) throws IOException { resultsIndexName = in.readOptionalString(); deleting = in.readBoolean(); allowLazyOpen = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + blocked = new Blocked(in); + } else { + blocked = Blocked.none(); + } } public Builder setId(String id) { @@ -865,6 +902,11 @@ public Builder setAllowLazyOpen(boolean allowLazyOpen) { return this; } + public Builder setBlocked(Blocked blocked) { + this.blocked = ExceptionsHelper.requireNonNull(blocked, BLOCKED); + return this; + } + /** * Return the list of fields that have been set and are invalid to * be set when the job is created e.g. model snapshot Id should not @@ -930,9 +972,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(resultsIndexName); out.writeBoolean(deleting); out.writeBoolean(allowLazyOpen); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + blocked.writeTo(out); + } } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; @@ -959,7 +1003,8 @@ public boolean equals(Object o) { && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) && Objects.equals(this.deleting, that.deleting) - && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); + && Objects.equals(this.allowLazyOpen, that.allowLazyOpen) + && Objects.equals(this.blocked, that.blocked); } @Override @@ -967,7 +1012,7 @@ public int hashCode() { return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, createTime, finishedTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays, - customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen); + customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen, blocked); } /** @@ -1126,7 +1171,7 @@ public Job build() { id, jobType, jobVersion, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays, - customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen); + customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen, blocked); } private void checkValidBackgroundPersistInterval() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index 1a93a51bb292a..ea040ad2c7e09 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -66,6 +66,7 @@ public class JobUpdate implements Writeable, ToXContentObject { INTERNAL_PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION); INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION); INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME); + INTERNAL_PARSER.declareObject(Builder::setBlocked, Blocked.STRICT_PARSER, Job.BLOCKED); } private final String jobId; @@ -87,6 +88,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private final Version jobVersion; private final Boolean clearJobFinishTime; private final Boolean allowLazyOpen; + private final Blocked blocked; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @Nullable List detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig, @@ -97,7 +99,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig, @Nullable Map customSettings, @Nullable String modelSnapshotId, @Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime, - @Nullable Boolean allowLazyOpen) { + @Nullable Boolean allowLazyOpen, @Nullable Blocked blocked) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -117,6 +119,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String this.jobVersion = jobVersion; this.clearJobFinishTime = clearJobFinishTime; this.allowLazyOpen = allowLazyOpen; + this.blocked = blocked; } public JobUpdate(StreamInput in) throws IOException { @@ -156,6 +159,11 @@ public JobUpdate(StreamInput in) throws IOException { modelSnapshotMinVersion = null; } allowLazyOpen = in.readOptionalBoolean(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + blocked = in.readOptionalWriteable(Blocked::new); + } else { + blocked = null; + } } @Override @@ -196,6 +204,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } out.writeOptionalBoolean(allowLazyOpen); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(blocked); + } } public String getJobId() { @@ -278,6 +289,10 @@ public boolean isAutodetectProcessUpdate() { return modelPlotConfig != null || perPartitionCategorizationConfig != null || detectorUpdates != null || groups != null; } + public Blocked getBlocked() { + return blocked; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -336,6 +351,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (allowLazyOpen != null) { builder.field(Job.ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen); } + if (blocked != null) { + builder.field(Job.BLOCKED.getPreferredName(), blocked); + } builder.endObject(); return builder; } @@ -485,6 +503,9 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) { if (allowLazyOpen != null) { builder.setAllowLazyOpen(allowLazyOpen); } + if (blocked != null) { + builder.setBlocked(blocked); + } builder.setAnalysisConfig(newAnalysisConfig); return builder.build(); @@ -511,7 +532,8 @@ && updatesDetectors(job) == false && (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion())) && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())) && (clearJobFinishTime == null || clearJobFinishTime == false || job.getFinishedTime() == null) - && (allowLazyOpen == null || Objects.equals(allowLazyOpen, job.allowLazyOpen())); + && (allowLazyOpen == null || Objects.equals(allowLazyOpen, job.allowLazyOpen())) + && (blocked == null || Objects.equals(blocked, job.getBlocked())); } boolean updatesDetectors(Job job) { @@ -562,7 +584,8 @@ public boolean equals(Object other) { && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.jobVersion, that.jobVersion) && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime) - && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); + && Objects.equals(this.allowLazyOpen, that.allowLazyOpen) + && Objects.equals(this.blocked, that.blocked); } @Override @@ -570,7 +593,7 @@ public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays, categorizationFilters, perPartitionCategorizationConfig, customSettings, modelSnapshotId, modelSnapshotMinVersion, - jobVersion, clearJobFinishTime, allowLazyOpen); + jobVersion, clearJobFinishTime, allowLazyOpen, blocked); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -586,9 +609,9 @@ public static class DetectorUpdate implements Writeable, ToXContentObject { DetectionRule.STRICT_PARSER.apply(parser, parseFieldMatcher).build(), Detector.CUSTOM_RULES_FIELD); } - private int detectorIndex; - private String description; - private List rules; + private final int detectorIndex; + private final String description; + private final List rules; public DetectorUpdate(int detectorIndex, String description, List rules) { this.detectorIndex = detectorIndex; @@ -685,6 +708,7 @@ public static class Builder { private Version jobVersion; private Boolean clearJobFinishTime; private Boolean allowLazyOpen; + private Blocked blocked; public Builder(String jobId) { this.jobId = jobId; @@ -795,11 +819,16 @@ public Builder setClearFinishTime(boolean clearJobFinishTime) { return this; } + public Builder setBlocked(Blocked blocked) { + this.blocked = blocked; + return this; + } + public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, categorizationFilters, perPartitionCategorizationConfig, customSettings, modelSnapshotId, modelSnapshotMinVersion, - jobVersion, clearJobFinishTime, allowLazyOpen); + jobVersion, clearJobFinishTime, allowLazyOpen, blocked); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index b86060dae5114..3ac456e41d9bb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -138,6 +138,7 @@ public final class Messages { public static final String JOB_AUDIT_CREATED = "Job created"; public static final String JOB_AUDIT_UPDATED = "Job updated: {0}"; public static final String JOB_AUDIT_CLOSING = "Job is closing"; + public static final String JOB_AUDIT_RESET = "Job has been reset"; public static final String JOB_AUDIT_FORCE_CLOSING = "Job is closing (forced)"; public static final String JOB_AUDIT_DATAFEED_CONTINUED_REALTIME = "Datafeed continued in real-time"; public static final String JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR = "Datafeed is encountering errors submitting data for analysis: {0}"; @@ -263,6 +264,7 @@ public final class Messages { public static final String REST_INVALID_FLUSH_PARAMS_MISSING = "Invalid flush parameters: ''{0}'' has not been specified."; public static final String REST_INVALID_FLUSH_PARAMS_UNEXPECTED = "Invalid flush parameters: unexpected ''{0}''."; public static final String REST_JOB_NOT_CLOSED_REVERT = "Can only revert to a model snapshot when the job is closed."; + public static final String REST_JOB_NOT_CLOSED_RESET = "Can only reset a job when it is closed."; public static final String REST_NO_SUCH_MODEL_SNAPSHOT = "No model snapshot with id [{0}] exists for job [{1}]"; public static final String REST_START_AFTER_END = "Invalid time range: end time ''{0}'' is earlier than start time ''{1}''."; public static final String REST_NO_SUCH_FORECAST = "No forecast(s) [{0}] exists for job [{1}]"; diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index 68a063acbbb10..6664f4959e6e1 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -291,6 +291,16 @@ "background_persist_interval" : { "type" : "keyword" }, + "blocked": { + "properties": { + "reason": { + "type": "keyword" + }, + "task_id": { + "type": "keyword" + } + } + }, "chunking_config" : { "properties" : { "mode" : { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ResetJobRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ResetJobRequestTests.java new file mode 100644 index 0000000000000..fba559fec8c22 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ResetJobRequestTests.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class ResetJobRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected ResetJobAction.Request createTestInstance() { + ResetJobAction.Request request = new ResetJobAction.Request(randomAlphaOfLength(10)); + request.setShouldStoreResult(randomBoolean()); + request.setSkipJobStateValidation(randomBoolean()); + return request; + } + + @Override + protected Writeable.Reader instanceReader() { + return ResetJobAction.Request::new; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/BlockedTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/BlockedTests.java new file mode 100644 index 0000000000000..d1052a64332ae --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/BlockedTests.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.job.config; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; + +public class BlockedTests extends AbstractSerializingTestCase { + + @Override + protected Blocked doParseInstance(XContentParser parser) throws IOException { + return Blocked.STRICT_PARSER.apply(parser, null); + } + + @Override + protected Writeable.Reader instanceReader() { + return Blocked::new; + } + + @Override + protected Blocked createTestInstance() { + return createRandom(); + } + + public static Blocked createRandom() { + Blocked.Reason reason = randomFrom(Blocked.Reason.values()); + TaskId taskId = (reason != Blocked.Reason.NONE && randomBoolean()) ? + new TaskId(randomAlphaOfLength(10) + ":" + randomNonNegativeLong()) : null; + return new Blocked(reason, taskId); + } + + public void testReasonFromString() { + assertThat(Blocked.Reason.fromString("NonE"), equalTo(Blocked.Reason.NONE)); + assertThat(Blocked.Reason.fromString("dElETe"), equalTo(Blocked.Reason.DELETE)); + assertThat(Blocked.Reason.fromString("ReSEt"), equalTo(Blocked.Reason.RESET)); + assertThat(Blocked.Reason.fromString("reVERt"), equalTo(Blocked.Reason.REVERT)); + } + + public void testReasonToString() { + List asStrings = Arrays.stream(Blocked.Reason.values()).map(Blocked.Reason::toString).collect(Collectors.toList()); + assertThat(asStrings, contains("none", "delete", "reset", "revert")); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 6d329c49825f5..384b8e72ec981 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MachineLearningField; @@ -594,6 +595,42 @@ public void testDocumentId() { assertNull(Job.extractJobIdFromDocumentId("some_other_type-foo")); } + public void testDeletingAndBlockReasonAreSynced() { + { + Job job = buildJobBuilder(randomValidJobId()) + .setDeleting(true) + .build(); + assertThat(job.getBlocked().getReason(), equalTo(Blocked.Reason.DELETE)); + } + { + Job job = buildJobBuilder(randomValidJobId()) + .setBlocked(new Blocked(Blocked.Reason.DELETE, null)) + .build(); + assertThat(job.isDeleting(), is(true)); + } + } + + public void testParseJobWithDeletingButWithoutBlockReason() throws IOException { + String jobWithDeleting = "{\n" + + " \"job_id\": \"deleting_job\",\n" + + " \"create_time\": 1234567890000,\n" + + " \"analysis_config\": {\n" + + " \"bucket_span\": \"1h\",\n" + + " \"detectors\": [{\"function\": \"count\"}]\n" + + " },\n" + + " \"data_description\": {\n" + + " \"time_field\": \"time\"\n" + + " },\n" + + " \"deleting\": true\n" + + "}"; + + try (XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, jobWithDeleting)) { + Job job = doParseInstance(parser); + assertThat(job.getBlocked().getReason(), equalTo(Blocked.Reason.DELETE)); + } + } + public static Job.Builder buildJobBuilder(String id, Date date) { Job.Builder builder = new Job.Builder(id); builder.setCreateTime(date); @@ -686,6 +723,9 @@ public static Job createRandomizedJob() { if (randomBoolean()) { builder.setAllowLazyOpen(randomBoolean()); } + if (randomBoolean()) { + builder.setBlocked(BlockedTests.createRandom()); + } return builder.build(); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index b6c15f2259f61..7bc2b5e41c512 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -130,6 +130,9 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { if (randomBoolean()) { update.setAllowLazyOpen(randomBoolean()); } + if (useInternalParser && randomBoolean()) { + update.setBlocked(BlockedTests.createRandom()); + } return update.build(); } diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index 72f34486ff2cb..a3eec00a9de30 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -196,6 +196,8 @@ tasks.named("yamlRestTest").configure { 'ml/preview_datafeed/Test preview missing datafeed', 'ml/preview_datafeed/Test preview with datafeed_id and job config', 'ml/preview_datafeed/Test preview with datafeed id and config', + 'ml/reset_job/Test reset given job is open', + 'ml/reset_job/Test reset given unknown job id', 'ml/revert_model_snapshot/Test revert model with invalid snapshotId', 'ml/start_data_frame_analytics/Test start given missing source index', 'ml/start_data_frame_analytics/Test start outlier_detection given source index has no fields', diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java index 499c4c99070bf..607468375e0da 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java @@ -10,10 +10,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -65,8 +67,10 @@ public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws Int blockingCall(maintenanceService::triggerDeleteJobsInStateDeletingWithoutDeletionTask); assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3")); - this.blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-2", listener)); - this.blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-3", listener)); + this.blockingCall(listener -> jobConfigProvider.updateJobBlockReason( + "maintenance-test-2", new Blocked(Blocked.Reason.DELETE, null), listener)); + this.blockingCall(listener -> jobConfigProvider.updateJobBlockReason( + "maintenance-test-3", new Blocked(Blocked.Reason.DELETE, null), listener)); assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3")); assertThat(getJob("maintenance-test-1").get(0).isDeleting(), is(false)); assertThat(getJob("maintenance-test-2").get(0).isDeleting(), is(true)); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 150de7a83e745..a16b7cd92fdd5 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.ResetJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -123,6 +124,11 @@ protected AcknowledgedResponse deleteJob(String jobId) { return client().execute(DeleteJobAction.INSTANCE, request).actionGet(); } + protected AcknowledgedResponse resetJob(String jobId) { + ResetJobAction.Request request = new ResetJobAction.Request(jobId); + return client().execute(ResetJobAction.INSTANCE, request).actionGet(); + } + protected PutDatafeedAction.Response putDatafeed(DatafeedConfig datafeed) { PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeed); return client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index b087a5150158c..f3007a1b9956f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -8,12 +8,6 @@ import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshAction; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.core.Nullable; @@ -23,15 +17,14 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; -import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; +import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; @@ -62,10 +55,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -282,24 +273,6 @@ protected static void assertThatAuditMessagesMatch(String configId, String... ex }); } - private static List fetchAllAuditMessages(String dataFrameAnalyticsId) { - RefreshRequest refreshRequest = new RefreshRequest(NotificationsIndex.NOTIFICATIONS_INDEX); - RefreshResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet(); - assertThat(refreshResponse.getStatus().getStatus(), anyOf(equalTo(200), equalTo(201))); - - SearchRequest searchRequest = new SearchRequestBuilder(client(), SearchAction.INSTANCE) - .setIndices(NotificationsIndex.NOTIFICATIONS_INDEX) - .addSort("timestamp", SortOrder.ASC) - .setQuery(QueryBuilders.termQuery("job_id", dataFrameAnalyticsId)) - .setSize(100) - .request(); - SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet(); - - return Arrays.stream(searchResponse.getHits().getHits()) - .map(hit -> (String) hit.getSourceAsMap().get("message")) - .collect(Collectors.toList()); - } - protected static Set getTrainingRowsIds(String index) { Set trainingRowsIds = new HashSet<>(); SearchResponse hits = client().prepareSearch(index).setSize(10000).get(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index 8feab9433ae96..4b765d8a2b993 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -8,17 +8,19 @@ import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest; -import org.elasticsearch.cluster.NamedDiff; -import org.elasticsearch.xpack.autoscaling.Autoscaling; -import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata; -import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; -import org.elasticsearch.xpack.core.action.CreateDataStreamAction; -import org.elasticsearch.xpack.transform.Transform; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Template; @@ -29,6 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.license.LicenseService; @@ -43,10 +46,15 @@ import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptEngine; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.transport.Netty4Plugin; +import org.elasticsearch.xpack.autoscaling.Autoscaling; +import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; import org.elasticsearch.xpack.core.ilm.DeleteAction; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; @@ -81,6 +89,7 @@ import org.elasticsearch.xpack.ml.LocalStateMachineLearning; import org.elasticsearch.xpack.ml.autoscaling.MlScalingReason; import org.elasticsearch.xpack.ml.inference.ModelAliasMetadata; +import org.elasticsearch.xpack.transform.Transform; import java.io.IOException; import java.io.UncheckedIOException; @@ -96,6 +105,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; @@ -103,6 +113,8 @@ import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.monitoring.MonitoringService.ELASTICSEARCH_COLLECTION_ENABLED; import static org.elasticsearch.xpack.security.test.SecurityTestUtils.writeFile; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; /** @@ -245,6 +257,24 @@ protected PutFilterAction.Response putMlFilter(MlFilter filter) { return client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet(); } + protected static List fetchAllAuditMessages(String jobId) { + RefreshRequest refreshRequest = new RefreshRequest(NotificationsIndex.NOTIFICATIONS_INDEX); + RefreshResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet(); + assertThat(refreshResponse.getStatus().getStatus(), anyOf(equalTo(200), equalTo(201))); + + SearchRequest searchRequest = new SearchRequestBuilder(client(), SearchAction.INSTANCE) + .setIndices(NotificationsIndex.NOTIFICATIONS_INDEX) + .addSort("timestamp", SortOrder.ASC) + .setQuery(QueryBuilders.termQuery("job_id", jobId)) + .setSize(100) + .request(); + SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet(); + + return Arrays.stream(searchResponse.getHits().getHits()) + .map(hit -> (String) hit.getSourceAsMap().get("message")) + .collect(Collectors.toList()); + } + @Override protected void ensureClusterStateConsistency() throws IOException { if (cluster() != null && cluster().size() > 0) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java new file mode 100644 index 0000000000000..6e9b07b9d3e3a --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.junit.After; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class ResetJobIT extends MlNativeAutodetectIntegTestCase { + + @After + public void tearDownData() { + cleanUp(); + } + + public void testReset() throws IOException { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + long startTime = 1514764800000L; + final int bucketCount = 100; + Job.Builder job = createJob("test-reset", bucketSpan); + + openJob(job.getId()); + postData(job.getId(), generateData(startTime, bucketSpan, bucketCount + 1, bucketIndex -> randomIntBetween(100, 200)) + .stream().collect(Collectors.joining())); + closeJob(job.getId()); + + List buckets = getBuckets(job.getId()); + assertThat(buckets.isEmpty(), is(false)); + + DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts(); + assertThat(dataCounts.getProcessedRecordCount(), greaterThan(0L)); + + resetJob(job.getId()); + + buckets = getBuckets(job.getId()); + assertThat(buckets.isEmpty(), is(true)); + + dataCounts = getJobStats(job.getId()).get(0).getDataCounts(); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(0L)); + + Job jobAfterReset = getJob(job.getId()).get(0); + assertThat(jobAfterReset.getBlocked(), equalTo(Blocked.none())); + assertThat(jobAfterReset.getModelSnapshotId(), is(nullValue())); + assertThat(jobAfterReset.getFinishedTime(), is(nullValue())); + + List auditMessages = fetchAllAuditMessages(job.getId()); + assertThat(auditMessages.isEmpty(), is(false)); + assertThat(auditMessages.get(auditMessages.size() - 1), equalTo("Job has been reset")); + } + + private Job.Builder createJob(String jobId, TimeValue bucketSpan) { + Detector.Builder detector = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + Job.Builder job = new Job.Builder(jobId); + job.setAnalysisConfig(analysisConfig); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + job.setDataDescription(dataDescription); + putJob(job); + + return job; + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java index bbc6c09d78986..2df0300c54ee8 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java @@ -384,7 +384,7 @@ public void testNewJobWithGlobalCalendar() throws Exception { assertEquals(0, buckets.get(2).getScheduledEvents().size()); } - private Job.Builder createJob(String jobId, TimeValue bucketSpan) { + private Job.Builder createJob(String jobId, TimeValue bucketSpan) { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 8259b3c678ec0..f5542477a5f1e 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -21,8 +21,10 @@ import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -415,8 +417,9 @@ public void testExpandJobIds_excludeDeleting() throws Exception { putJob(createJob("foo-deleting", null)); putJob(createJob("bar", null)); - Boolean marked = blockingCall(actionListener -> jobConfigProvider.markJobAsDeleting("foo-deleting", actionListener)); - assertTrue(marked); + PutJobAction.Response marked = blockingCall(actionListener -> jobConfigProvider.updateJobBlockReason( + "foo-deleting", new Blocked(Blocked.Reason.DELETE, null), actionListener)); + assertThat(marked.getResponse().getBlocked().getReason(), equalTo(Blocked.Reason.DELETE)); client().admin().indices().prepareRefresh(MlConfigIndex.indexName()).get(); @@ -559,27 +562,30 @@ public void testValidateDatafeedJob() throws Exception { assertEquals(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, exceptionHolder.get().getMessage()); } - public void testMarkAsDeleting() throws Exception { - AtomicReference responseHolder = new AtomicReference<>(); + public void testUpdateJobBlockReason() throws Exception { + AtomicReference responseHolder = new AtomicReference<>(); AtomicReference exceptionHolder = new AtomicReference<>(); - blockingCall(listener -> jobConfigProvider.markJobAsDeleting("missing-job", listener), responseHolder, exceptionHolder); + blockingCall(listener -> jobConfigProvider.updateJobBlockReason( + "missing-job", new Blocked(Blocked.Reason.RESET, null), listener), responseHolder, exceptionHolder); assertNull(responseHolder.get()); assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass()); - String jobId = "mark-as-deleting-job"; + String jobId = "update-job-blocked-reset"; putJob(createJob(jobId, Collections.emptyList())); client().admin().indices().prepareRefresh(MlConfigIndex.indexName()).get(); exceptionHolder.set(null); - blockingCall(listener -> jobConfigProvider.markJobAsDeleting(jobId, listener), responseHolder, exceptionHolder); + blockingCall(listener -> jobConfigProvider.updateJobBlockReason( + jobId, new Blocked(Blocked.Reason.RESET, null), listener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); - assertTrue(responseHolder.get()); + assertThat(responseHolder.get().getResponse().getBlocked().getReason(), equalTo(Blocked.Reason.RESET)); // repeat the update for good measure - blockingCall(listener -> jobConfigProvider.markJobAsDeleting(jobId, listener), responseHolder, exceptionHolder); - assertTrue(responseHolder.get()); + blockingCall(listener -> jobConfigProvider.updateJobBlockReason( + jobId, new Blocked(Blocked.Reason.RESET, null), listener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); + assertThat(responseHolder.get().getResponse().getBlocked().getReason(), equalTo(Blocked.Reason.RESET)); } private static Job.Builder createJob(String jobId, List groups) { diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index e9ba392f4899b..80b45459f2630 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -523,7 +523,7 @@ public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() t assertBusy(() -> { DataCounts dataCounts = getJobStats(jobId).getDataCounts(); - assertThat(dataCounts.getProcessedRecordCount(), greaterThanOrEqualTo(numDocs)); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 30758c8b1c8f3..a54a202b738d0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -141,6 +141,7 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction; +import org.elasticsearch.xpack.core.ml.action.ResetJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; @@ -228,6 +229,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutJobAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction; +import org.elasticsearch.xpack.ml.action.TransportResetJobAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction; import org.elasticsearch.xpack.ml.action.TransportSetUpgradeModeAction; @@ -368,6 +370,7 @@ import org.elasticsearch.xpack.ml.rest.job.RestPostDataAction; import org.elasticsearch.xpack.ml.rest.job.RestPostJobUpdateAction; import org.elasticsearch.xpack.ml.rest.job.RestPutJobAction; +import org.elasticsearch.xpack.ml.rest.job.RestResetJobAction; import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestDeleteModelSnapshotAction; import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestGetModelSnapshotsAction; import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestRevertModelSnapshotAction; @@ -932,6 +935,7 @@ public List getRestHandlers(Settings settings, RestController restC new RestPostDataAction(), new RestCloseJobAction(), new RestFlushJobAction(), + new RestResetJobAction(), new RestValidateDetectorAction(), new RestValidateJobConfigAction(), new RestEstimateModelMemoryAction(), @@ -1018,6 +1022,7 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(CloseJobAction.INSTANCE, TransportCloseJobAction.class), new ActionHandler<>(FinalizeJobExecutionAction.INSTANCE, TransportFinalizeJobExecutionAction.class), new ActionHandler<>(FlushJobAction.INSTANCE, TransportFlushJobAction.class), + new ActionHandler<>(ResetJobAction.INSTANCE, TransportResetJobAction.class), new ActionHandler<>(ValidateDetectorAction.INSTANCE, TransportValidateDetectorAction.class), new ActionHandler<>(ValidateJobConfigAction.INSTANCE, TransportValidateJobConfigAction.class), new ActionHandler<>(EstimateModelMemoryAction.INSTANCE, TransportEstimateModelMemoryAction.class), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index ac4353ddb4990..902b50221f88b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; @@ -21,10 +23,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.CheckedConsumer; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; @@ -34,6 +36,10 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.ResetJobAction; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; @@ -144,7 +150,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust } ); - ActionListener markAsDeletingListener = ActionListener.wrap( + ActionListener markAsDeletingListener = ActionListener.wrap( response -> { if (request.isForce()) { forceDeleteJob(parentTaskClient, request, finalListener); @@ -157,7 +163,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust ActionListener jobExistsListener = ActionListener.wrap( response -> { auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId)); - markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener); + markJobAsDeletingIfNotUsed(request.getJobId(), taskId, markAsDeletingListener); }, e -> { if (request.isForce() @@ -229,7 +235,7 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ // Step 1. Delete the physical storage new JobDataDeleter(parentTaskClient, jobId).deleteJobDocuments( - jobId, jobConfigProvider, indexNameExpressionResolver, clusterService.state(), removeFromCalendarsHandler, listener::onFailure); + jobConfigProvider, indexNameExpressionResolver, clusterService.state(), removeFromCalendarsHandler, listener::onFailure); } private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request, @@ -304,7 +310,7 @@ private void checkJobIsNotOpen(String jobId, ClusterState state) { } } - private void markJobAsDeletingIfNotUsed(String jobId, ActionListener listener) { + private void markJobAsDeletingIfNotUsed(String jobId, TaskId taskId, ActionListener listener) { datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap( datafeedIds -> { @@ -313,9 +319,42 @@ private void markJobAsDeletingIfNotUsed(String jobId, ActionListener li + datafeedIds.iterator().next() + "] refers to it")); return; } - jobConfigProvider.markJobAsDeleting(jobId, listener); + cancelResetTaskIfExists(jobId, ActionListener.wrap( + response -> jobConfigProvider.updateJobBlockReason(jobId, new Blocked(Blocked.Reason.DELETE, taskId), listener), + listener::onFailure + )); }, listener::onFailure )); } + + private void cancelResetTaskIfExists(String jobId, ActionListener listener) { + ActionListener jobListener = ActionListener.wrap( + jobBuilder -> { + Job job = jobBuilder.build(); + if (job.getBlocked().getReason() == Blocked.Reason.RESET) { + logger.info("[{}] Cancelling reset task [{}] because delete was requested", jobId, job.getBlocked().getTaskId()); + CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setReason("deleting job"); + cancelTasksRequest.setActions(ResetJobAction.NAME); + cancelTasksRequest.setTaskId(job.getBlocked().getTaskId()); + executeAsyncWithOrigin(client, ML_ORIGIN, CancelTasksAction.INSTANCE, cancelTasksRequest, ActionListener.wrap( + cancelTasksResponse -> listener.onResponse(true), + e -> { + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { + listener.onResponse(true); + } else { + listener.onFailure(e); + } + } + )); + } else { + listener.onResponse(false); + } + }, + listener::onFailure + ); + + jobConfigProvider.getJob(jobId, jobListener); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java new file mode 100644 index 0000000000000..0c3cb5c4d977c --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java @@ -0,0 +1,212 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.ResetJobAction; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; + +import java.util.Objects; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +public class TransportResetJobAction extends AcknowledgedTransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportResetJobAction.class); + + private final Client client; + private final JobConfigProvider jobConfigProvider; + private final JobResultsProvider jobResultsProvider; + private final AnomalyDetectionAuditor auditor; + + @Inject + public TransportResetJobAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client, + JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider, + AnomalyDetectionAuditor auditor) { + super(ResetJobAction.NAME, transportService, clusterService, threadPool, actionFilters, ResetJobAction.Request::new, + indexNameExpressionResolver, ThreadPool.Names.SAME); + this.client = Objects.requireNonNull(client); + this.jobConfigProvider = Objects.requireNonNull(jobConfigProvider); + this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); + this.auditor = Objects.requireNonNull(auditor); + } + + @Override + protected void masterOperation(Task task, ResetJobAction.Request request, ClusterState state, + ActionListener listener) throws Exception { + if (MlMetadata.getMlMetadata(state).isUpgradeMode()) { + listener.onFailure(ExceptionsHelper.conflictStatusException("cannot reset job while indices are being upgraded")); + return; + } + + final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); + + ActionListener jobListener = ActionListener.wrap( + jobBuilder -> { + Job job = jobBuilder.build(); + PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + JobState jobState = MlTasks.getJobState(job.getId(), tasks); + if (request.isSkipJobStateValidation() == false && jobState != JobState.CLOSED) { + listener.onFailure(ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_RESET))); + return; + } + if (job.getBlocked().getReason() != Blocked.Reason.NONE && job.getBlocked().getReason() != Blocked.Reason.RESET) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + "cannot reset job while it is blocked with [" + job.getBlocked().getReason() + "]")); + return; + } + + if (job.getBlocked().getReason() == Blocked.Reason.RESET) { + waitExistingResetTaskToComplete(job.getBlocked().getTaskId(), request, listener); + } else { + ParentTaskAssigningClient taskClient = new ParentTaskAssigningClient(client, taskId); + jobConfigProvider.updateJobBlockReason(job.getId(), new Blocked(Blocked.Reason.RESET, taskId), ActionListener.wrap( + r -> resetJob(taskClient, (CancellableTask) task, request, listener), + listener::onFailure + )); + } + }, + listener::onFailure + ); + + jobConfigProvider.getJob(request.getJobId(), jobListener); + } + + private void waitExistingResetTaskToComplete(TaskId existingTaskId, ResetJobAction.Request request, + ActionListener listener) { + logger.debug(() -> new ParameterizedMessage( + "[{}] Waiting on existing reset task: {}", request.getJobId(), existingTaskId)); + GetTaskRequest getTaskRequest = new GetTaskRequest(); + getTaskRequest.setTaskId(existingTaskId); + getTaskRequest.setWaitForCompletion(true); + getTaskRequest.setTimeout(request.timeout()); + executeAsyncWithOrigin(client, ML_ORIGIN, GetTaskAction.INSTANCE, getTaskRequest, ActionListener.wrap( + getTaskResponse -> { + TaskResult taskResult = getTaskResponse.getTask(); + if (taskResult.isCompleted()) { + listener.onResponse(AcknowledgedResponse.of(true)); + } else { + BytesReference taskError = taskResult.getError(); + if (taskError != null) { + listener.onFailure(ExceptionsHelper.serverError("reset failed to complete; error [{}]", + taskError.utf8ToString())); + } else { + listener.onFailure(ExceptionsHelper.serverError("reset failed to complete")); + } + } + }, + listener::onFailure + )); + } + + private void resetJob(ParentTaskAssigningClient taskClient, CancellableTask task, ResetJobAction.Request request, + ActionListener listener) { + String jobId = request.getJobId(); + + // Now that we have updated the job's block reason, we should check again + // if the job has been opened. + PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + JobState jobState = MlTasks.getJobState(jobId, tasks); + if (request.isSkipJobStateValidation() == false && jobState != JobState.CLOSED) { + jobConfigProvider.updateJobBlockReason(jobId, null, ActionListener.wrap( + clearResetResponse -> listener.onFailure(ExceptionsHelper.conflictStatusException( + Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_RESET))), + e -> listener.onFailure(ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_RESET))) + )); + return; + } + + logger.info("[{}] Resetting job", jobId); + + ActionListener resultsIndexCreatedListener = ActionListener.wrap( + resultsIndexCreatedResponse -> { + if (task.isCancelled()) { + listener.onResponse(AcknowledgedResponse.of(false)); + return; + } + finishSuccessfulReset(jobId, listener); + }, + listener::onFailure + ); + + CheckedConsumer jobDocsDeletionListener = response -> { + if (task.isCancelled()) { + listener.onResponse(AcknowledgedResponse.of(false)); + return; + } + jobConfigProvider.getJob(jobId, ActionListener.wrap( + jobBuilder -> { + if (task.isCancelled()) { + listener.onResponse(AcknowledgedResponse.of(false)); + return; + } + jobResultsProvider.createJobResultIndex( + jobBuilder.build(), clusterService.state(), resultsIndexCreatedListener); + }, + listener::onFailure + )); + }; + + JobDataDeleter jobDataDeleter = new JobDataDeleter(taskClient, jobId); + jobDataDeleter.deleteJobDocuments(jobConfigProvider, indexNameExpressionResolver, + clusterService.state(), jobDocsDeletionListener, listener::onFailure); + } + + private void finishSuccessfulReset(String jobId, ActionListener listener) { + jobConfigProvider.updateJobAfterReset(jobId, ActionListener.wrap( + blockReasonUpdatedResponse -> { + logger.info("[{}] Reset has successfully completed", jobId); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_RESET)); + listener.onResponse(AcknowledgedResponse.of(true)); + }, + listener::onFailure + )); + } + + @Override + protected ClusterBlockException checkBlock(ResetJobAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index 566d372b3e721..9ceec682b2610 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; @@ -22,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlConfigIndex; @@ -29,6 +32,8 @@ import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -45,6 +50,9 @@ import java.util.Set; import java.util.function.Consumer; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + public class TransportRevertModelSnapshotAction extends TransportMasterNodeAction { @@ -75,6 +83,7 @@ public TransportRevertModelSnapshotAction(Settings settings, ThreadPool threadPo protected void masterOperation(Task task, RevertModelSnapshotAction.Request request, ClusterState state, ActionListener listener) { final String jobId = request.getJobId(); + final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); if (migrationEligibilityCheck.jobIsEligibleForMigration(jobId, state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", jobId)); @@ -87,32 +96,44 @@ protected void masterOperation(Task task, RevertModelSnapshotAction.Request requ // 5. Revert the state ActionListener annotationsIndexUpdateListener = ActionListener.wrap( r -> { - PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); - JobState jobState = MlTasks.getJobState(jobId, tasks); - - if (request.isForce() == false && jobState.equals(JobState.CLOSED) == false) { - listener.onFailure(ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT))); - return; - } - - if (MlTasks.getSnapshotUpgraderTask(jobId, request.getSnapshotId(), tasks) != null) { - listener.onFailure(ExceptionsHelper.conflictStatusException( - "Cannot revert job [{}] to snapshot [{}] as it is being upgraded", - jobId, - request.getSnapshotId() - )); - return; - } + ActionListener jobListener = ActionListener.wrap( + job -> { + PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + JobState jobState = MlTasks.getJobState(job.getId(), tasks); + if (request.isForce() == false && jobState.equals(JobState.CLOSED) == false) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT))); + return; + } + if (MlTasks.getSnapshotUpgraderTask(jobId, request.getSnapshotId(), tasks) != null) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + "Cannot revert job [{}] to snapshot [{}] as it is being upgraded", + jobId, + request.getSnapshotId() + )); + return; + } + isBlocked(job, ActionListener.wrap( + isBlocked -> { + if (isBlocked) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + "cannot revert job [{}] to snapshot [{}] while it is blocked with [{}]", + jobId, request.getSnapshotId(), job.getBlocked().getReason()) + ); + } else { + jobManager.updateJobBlockReason(jobId, new Blocked(Blocked.Reason.REVERT, taskId), ActionListener.wrap( + aBoolean -> revertSnapshot(jobId, request, listener), + listener::onFailure + )); + } + }, + listener::onFailure + )); + }, + listener::onFailure + ); - getModelSnapshot(request, jobResultsProvider, modelSnapshot -> { - ActionListener wrappedListener = listener; - if (request.getDeleteInterveningResults()) { - wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, jobId); - wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, jobId); - wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, jobId); - } - jobManager.revertSnapshot(request, wrappedListener, modelSnapshot); - }, listener::onFailure); + jobManager.getJob(jobId, jobListener); }, listener::onFailure ); @@ -142,6 +163,58 @@ protected void masterOperation(Task task, RevertModelSnapshotAction.Request requ createStateIndexListener); } + private void isBlocked(Job job, ActionListener listener) { + if (job.getBlocked().getReason() == Blocked.Reason.NONE) { + listener.onResponse(false); + return; + } + if (job.getBlocked().getReason() == Blocked.Reason.REVERT) { + // If another revert is called but there is a revert task running + // we do not allow it to run. However, if the job got stuck with + // a block on revert, it means the node that was running the previous + // revert failed. So, we allow a revert to run if the task has completed + // in order to complete and eventually unblock the job. + GetTaskRequest getTaskRequest = new GetTaskRequest(); + getTaskRequest.setTaskId(job.getBlocked().getTaskId()); + executeAsyncWithOrigin(client, ML_ORIGIN, GetTaskAction.INSTANCE, getTaskRequest, ActionListener.wrap( + r -> listener.onResponse(r.getTask().isCompleted() == false), + e -> { + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { + listener.onResponse(false); + } else { + listener.onFailure(e); + } + } + )); + } else { + listener.onResponse(true); + } + } + + private void revertSnapshot(String jobId, RevertModelSnapshotAction.Request request, + ActionListener listener) { + ActionListener finalListener = ActionListener.wrap( + r -> jobManager.updateJobBlockReason(jobId, Blocked.none(), ActionListener.wrap( + aBoolean -> listener.onResponse(r), + listener::onFailure + )) + , e -> jobManager.updateJobBlockReason(jobId, Blocked.none(), ActionListener.wrap( + aBoolean -> listener.onFailure(e), + listener::onFailure + )) + ); + + getModelSnapshot(request, jobResultsProvider, modelSnapshot -> { + ActionListener wrappedListener = finalListener; + if (request.getDeleteInterveningResults()) { + wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, jobId); + wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, jobId); + wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, jobId); + } + jobManager.revertSnapshot(request, wrappedListener, modelSnapshot); + }, listener::onFailure); + } + private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer handler, Consumer errorHandler) { logger.info("Reverting to snapshot '" + request.getSnapshotId() + "'"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index d29f128852b6b..94b963d5654d8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; @@ -28,6 +27,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -661,4 +662,8 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList actionListener::onFailure )); } + + public void updateJobBlockReason(String jobId, Blocked blocked, ActionListener listener) { + jobConfigProvider.updateJobBlockReason(jobId, blocked, listener); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 0f0727f1fa4a2..cbe59b21c423e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.join.ScoreMode; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; @@ -27,10 +26,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.update.UpdateAction; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.regex.Regex; @@ -42,8 +38,8 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.ExistsQueryBuilder; @@ -60,12 +56,16 @@ import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; @@ -273,17 +273,16 @@ public interface UpdateValidator { * @param update The job update * @param maxModelMemoryLimit The maximum model memory allowed * @param validator The job update validator - * @param updatedJobListener Updated job listener + * @param listener Updated job listener */ public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, - UpdateValidator validator, ActionListener updatedJobListener) { + UpdateValidator validator, ActionListener listener) { GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), Job.documentId(jobId)); - executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener.Delegating<>(updatedJobListener) { - @Override - public void onResponse(GetResponse getResponse) { + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap( + getResponse -> { if (getResponse.isExists() == false) { - delegate.onFailure(ExceptionsHelper.missingJobException(jobId)); + listener.onFailure(ExceptionsHelper.missingJobException(jobId)); return; } @@ -294,27 +293,34 @@ public void onResponse(GetResponse getResponse) { try { originalJob = parseJobLenientlyFromSource(source).build(); } catch (Exception e) { - delegate.onFailure(new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e)); + listener.onFailure(new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e)); return; } validator.validate(originalJob, update, ActionListener.wrap( - validated -> { - Job updatedJob; - try { - // Applying the update may result in a validation error - updatedJob = update.mergeWithJob(originalJob, maxModelMemoryLimit); - } catch (Exception e) { - delegate.onFailure(e); - return; - } + validated -> { + Job updatedJob; + try { + // Applying the update may result in a validation error + updatedJob = update.mergeWithJob(originalJob, maxModelMemoryLimit); + } catch (Exception e) { + listener.onFailure(e); + return; + } - indexUpdatedJob(updatedJob, seqNo, primaryTerm, delegate); - }, - delegate::onFailure + indexUpdatedJob(updatedJob, seqNo, primaryTerm, listener); + }, + listener::onFailure )); + }, + e -> { + if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { + listener.onFailure(ExceptionsHelper.missingJobException(jobId)); + } else { + listener.onFailure(e); + } } - }); + )); } private void indexUpdatedJob(Job updatedJob, long seqNo, long primaryTerm, @@ -425,32 +431,18 @@ public void jobIdMatches(List ids, ActionListener> listener , client::search); } - /** - * Sets the job's {@code deleting} field to true - * @param jobId The job to mark as deleting - * @param listener Responds with true if successful else an error - */ - public void markJobAsDeleting(String jobId, ActionListener listener) { - UpdateRequest updateRequest = new UpdateRequest(MlConfigIndex.indexName(), Job.documentId(jobId)); - updateRequest.retryOnConflict(3); - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - updateRequest.doc(Collections.singletonMap(Job.DELETING.getPreferredName(), Boolean.TRUE)); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap( - response -> { - assert (response.getResult() == DocWriteResponse.Result.UPDATED) || - (response.getResult() == DocWriteResponse.Result.NOOP); - listener.onResponse(Boolean.TRUE); - }, - e -> { - ElasticsearchException[] causes = ElasticsearchException.guessRootCauses(e); - if (causes[0] instanceof DocumentMissingException) { - listener.onFailure(ExceptionsHelper.missingJobException(jobId)); - } else { - listener.onFailure(e); - } - } - )); + public void updateJobBlockReason(String jobId, Blocked blocked, ActionListener listener) { + JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setBlocked(blocked).build(); + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, UpdateJobAction.Request.internal(jobId, jobUpdate), listener); + } + + public void updateJobAfterReset(String jobId, ActionListener listener) { + JobUpdate jobUpdate = new JobUpdate.Builder(jobId) + .setModelSnapshotId(ModelSnapshot.EMPTY_SNAPSHOT_ID) + .setBlocked(Blocked.none()) + .setClearFinishTime(true) + .build(); + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, UpdateJobAction.Request.internal(jobId, jobUpdate), listener); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index ab4571aa0c123..5c6dcbae037a0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -244,9 +244,9 @@ public void deleteDatafeedTimingStats(ActionListener liste /** * Deletes all documents associated with a job except user annotations and notifications */ - public void deleteJobDocuments(String jobId, JobConfigProvider jobConfigProvider, - IndexNameExpressionResolver indexNameExpressionResolver, ClusterState clusterState, - CheckedConsumer finishedHandler, Consumer failureHandler) { + public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExpressionResolver indexNameExpressionResolver, + ClusterState clusterState, CheckedConsumer finishedHandler, + Consumer failureHandler) { AtomicReference indexNames = new AtomicReference<>(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index 112e2dbebe24e..b60e804fcb3c7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -35,7 +35,9 @@ import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.GetJobsAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.ResetJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; @@ -165,8 +167,9 @@ static void validateJobAndId(String jobId, Job job) { if (job == null) { throw ExceptionsHelper.missingJobException(jobId); } - if (job.isDeleting()) { - throw ExceptionsHelper.conflictStatusException("Cannot open job [{}] because it is being deleted", jobId); + if (job.getBlocked().getReason() != Blocked.Reason.NONE) { + throw ExceptionsHelper.conflictStatusException("Cannot open job [{}] because it is executing [{}]", jobId, + job.getBlocked().getReason()); } if (job.getJobVersion() == null) { throw ExceptionsHelper.badRequestException( @@ -239,8 +242,7 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams ActionListener hasRunningDatafeedTaskListener = ActionListener.wrap( hasRunningDatafeed -> { - if (hasRunningDatafeed && clusterState.nodes().getMasterNode().getVersion().onOrAfter( - MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) { + if (hasRunningDatafeed && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) { // This job has a running datafeed attached to it. // In order to prevent gaps in the model we revert to the current snapshot deleting intervening results. @@ -255,6 +257,10 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener); } + private boolean isMasterNodeVersionOnOrAfter(Version version) { + return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version); + } + private void hasRunningDatafeedTask(String jobId, ActionListener listener) { ActionListener> datafeedListener = ActionListener.wrap( datafeeds -> { @@ -275,9 +281,7 @@ private void hasRunningDatafeedTask(String jobId, ActionListener listen datafeedConfigProvider.findDatafeedsForJobIds(Collections.singleton(jobId), datafeedListener); } - private void revertToCurrentSnapshot(String jobId, ActionListener listener) { - logger.info("[{}] job has running datafeed task; reverting to current snapshot", jobId); - + private void revertToCurrentSnapshot(String jobId, ActionListener listener) { ActionListener jobListener = ActionListener.wrap( jobResponse -> { List jobPage = jobResponse.getResponse().results(); @@ -285,12 +289,28 @@ private void revertToCurrentSnapshot(String jobId, ActionListener listener.onResponse(true), + listener::onFailure + )); + } else { + logger.info("[{}] job has running datafeed task; reverting to current snapshot", jobId); + RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId, + jobSnapshotId == null ? ModelSnapshot.EMPTY_SNAPSHOT_ID : jobSnapshotId); + request.setForce(true); + request.setDeleteInterveningResults(true); + request.masterNodeTimeout(PERSISTENT_TASK_MASTER_NODE_TIMEOUT); + executeAsyncWithOrigin(client, ML_ORIGIN, RevertModelSnapshotAction.INSTANCE, request, ActionListener.wrap( + response -> listener.onResponse(true), + listener::onFailure + )); + } }, error -> listener.onFailure(ExceptionsHelper.serverError("[{}] error getting job", error, jobId)) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestResetJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestResetJobAction.java new file mode 100644 index 0000000000000..7f568a9cc1f49 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestResetJobAction.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.rest.job; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskListener; +import org.elasticsearch.xpack.core.ml.action.ResetJobAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.xpack.ml.MachineLearning.BASE_PATH; + +public class RestResetJobAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of( + new Route(POST, BASE_PATH + "anomaly_detectors/{" + Job.ID + "}/_reset") + ); + } + + @Override + public String getName() { + return "ml_reset_job_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + ResetJobAction.Request request = new ResetJobAction.Request(restRequest.param(Job.ID.getPreferredName())); + request.timeout(restRequest.paramAsTime("timeout", request.timeout())); + request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); + + if (restRequest.paramAsBoolean("wait_for_completion", true)) { + return channel -> client.execute(ResetJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } else { + request.setShouldStoreResult(true); + Task task = client.executeLocally(ResetJobAction.INSTANCE, request, nullTaskListener()); + return channel -> { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field("task", client.getLocalNodeId() + ":" + task.getId()); + builder.endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + } + }; + } + } + + // We do not want to log anything due to a delete action + // The response or error will be returned to the client when called synchronously + // or it will be stored in the task result when called asynchronously + private static TaskListener nullTaskListener() { + return new TaskListener() { + @Override + public void onResponse(Task task, T o) {} + + @Override + public void onFailure(Task task, Exception e) {} + }; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 13184d1a84ac5..318bcc8ce140f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; @@ -40,7 +41,9 @@ protected MlMetadata createTestInstance() { MlMetadata.Builder builder = new MlMetadata.Builder(); int numJobs = randomIntBetween(0, 10); for (int i = 0; i < numJobs; i++) { - Job job = JobTests.createRandomizedJob(); + Job.Builder job = new Job.Builder(JobTests.createRandomizedJob()); + job.setDeleting(false); + job.setBlocked(Blocked.none()); if (randomBoolean()) { AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig()); analysisConfig.setLatency(null); @@ -49,11 +52,11 @@ protected MlMetadata createTestInstance() { if (datafeedConfig.hasAggregations()) { analysisConfig.setSummaryCountFieldName("doc_count"); } - job = new Job.Builder(job).setAnalysisConfig(analysisConfig).build(); - builder.putJob(job, false); + job.setAnalysisConfig(analysisConfig).build(); + builder.putJob(job.build(), false); builder.putDatafeed(datafeedConfig, Collections.emptyMap(), xContentRegistry()); } else { - builder.putJob(job, false); + builder.putJob(job.build(), false); } } return builder.isResetMode(randomBoolean()).isUpgradeMode(randomBoolean()).build(); @@ -181,7 +184,7 @@ protected MlMetadata mutateInstance(MlMetadata instance) { if (datafeedConfig.hasAggregations()) { analysisConfig.setSummaryCountFieldName("doc_count"); } - randomJob = new Job.Builder(randomJob).setAnalysisConfig(analysisConfig).build(); + randomJob = new Job.Builder(randomJob).setAnalysisConfig(analysisConfig).setDeleting(false).setBlocked(Blocked.none()).build(); metadataBuilder.putJob(randomJob, false); metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap(), xContentRegistry()); break; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index fe80e5561d020..21e83e3f18808 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -115,7 +116,15 @@ public void testValidate_jobMarkedAsDeleting() { jobBuilder.setDeleting(true); Exception e = expectThrows(ElasticsearchStatusException.class, () -> validateJobAndId("job_id", jobBuilder.build())); - assertEquals("Cannot open job [job_id] because it is being deleted", e.getMessage()); + assertEquals("Cannot open job [job_id] because it is executing [delete]", e.getMessage()); + } + + public void testValidate_blockedReset() { + Job.Builder jobBuilder = buildJobBuilder("job_id"); + jobBuilder.setBlocked(new Blocked(Blocked.Reason.REVERT, null)); + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> validateJobAndId("job_id", jobBuilder.build())); + assertEquals("Cannot open job [job_id] because it is executing [revert]", e.getMessage()); } public void testValidate_jobWithoutVersion() { diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 63e1c5dd51f92..0f949996ff130 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -154,6 +154,7 @@ public class Constants { "cluster:admin/xpack/ml/job/open", "cluster:admin/xpack/ml/job/persist", "cluster:admin/xpack/ml/job/put", + "cluster:admin/xpack/ml/job/reset", "cluster:admin/xpack/ml/job/update", "cluster:admin/xpack/ml/job/validate", "cluster:admin/xpack/ml/job/validate/detector", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/reset_job.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/reset_job.yml new file mode 100644 index 0000000000000..9025960901cf5 --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/reset_job.yml @@ -0,0 +1,47 @@ +setup: + - skip: + features: headers + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + ml.put_job: + job_id: reset-job + body: > + { + "job_id":"reset-job", + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"count"}] + }, + "data_description" : { + "field_delimiter":",", + "time_field":"time", + "time_format":"yyyy-MM-dd HH:mm:ssX" + } + } + +--- +"Test reset": + - do: + ml.reset_job: + job_id: reset-job + - match: { acknowledged: true } + +--- +"Test reset given unknown job id": + - do: + catch: missing + ml.reset_job: + job_id: not-a-job + +--- +"Test reset given job is open": + - do: + ml.open_job: + job_id: reset-job + - match: { opened: true } + + - do: + catch: /Can only reset a job when it is closed/ + ml.reset_job: + job_id: reset-job