Skip to content

[ML] Reset anomaly detection job API #73908

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/reference/ml/anomaly-detection/apis/get-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=exclude-generated]
The API returns an array of {anomaly-job} resources. For the full list of
properties, see <<ml-put-job-request-body,create {anomaly-jobs} API>>.

//Begin blocked
`blocked`::
(object) When present, it explains that a task is executed on the job
that blocks it from opening.
+
.Properties of `blocked`
[%collapsible%open]
====
`reason`:::
(string) The reason the job is blocked. Values may be `delete`, `reset`, `revert`.
Each value means the corresponding action is being executed.

`task_id`:::
(string) The task id of the blocking action. You can use the <<tasks>> API to
monitor progress.
====
//End blocked

`create_time`::
(string) The time the job was created. For example, `1491007356077`. This
property is informational; you cannot change its value.
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/ml/anomaly-detection/apis/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ include::open-job.asciidoc[leveloffset=+2]
include::post-data.asciidoc[leveloffset=+2]
//PREVIEW
include::preview-datafeed.asciidoc[leveloffset=+2]
//RESET
include::reset-job.asciidoc[leveloffset=+2]
//REVERT
include::revert-snapshot.asciidoc[leveloffset=+2]
//SET/START/STOP
Expand Down
1 change: 1 addition & 0 deletions docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ See also <<ml-df-analytics-apis>>.
* <<ml-post-data,Post data to {anomaly-jobs}>>
* <<ml-update-job,Update {anomaly-jobs}>>
* <<ml-forecast,Create>> or <<ml-delete-forecast,delete forecasts>>
* <<ml-reset-job,Reset {anomaly-jobs}>>


[discrete]
Expand Down
86 changes: 86 additions & 0 deletions docs/reference/ml/anomaly-detection/apis/reset-job.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
[role="xpack"]
[testenv="platinum"]
[[ml-reset-job]]
= Reset {anomaly-jobs} API
++++
<titleabbrev>Reset jobs</titleabbrev>
++++

Resets an existing {anomaly-job}.

[[ml-reset-job-request]]
== {api-request-title}

`POST _ml/anomaly_detectors/<job_id>/_reset`

[[ml-reset-job-prereqs]]
== {api-prereq-title}

* Requires the `manage_ml` cluster privilege. This privilege is included in the
`machine_learning_admin` built-in role.
* Before you can reset a job, you must close it. You can set `force` to `true`
when closing the job to avoid waiting for the job to complete. See
<<ml-close-job>>.

[[ml-reset-job-desc]]
== {api-description-title}

All model state and results are deleted.
The job is ready to start over as if it had just been created.

It is not currently possible to reset multiple jobs using wildcards or a comma
separated list.

[[ml-reset-job-path-parms]]
== {api-path-parms-title}

`<job_id>`::
(Required, string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection]

[[ml-reset-job-query-parms]]
== {api-query-parms-title}

`wait_for_completion`::
(Optional, Boolean) Specifies whether the request should return immediately or
wait until the job reset completes. Defaults to `true`.

[[ml-reset-job-example]]
== {api-examples-title}

[source,console]
--------------------------------------------------
POST _ml/anomaly_detectors/total-requests/_reset
--------------------------------------------------
// TEST[skip:setup:server_metrics_job]

When the job is reset, you receive the following results:

[source,console-result]
----
{
"acknowledged": true
}
----

In the next example we reset the `total-requests` job asynchronously:

[source,console]
--------------------------------------------------
POST _ml/anomaly_detectors/total-requests/_reset?wait_for_completion=false
--------------------------------------------------
// TEST[skip:setup:server_metrics_job]

When `wait_for_completion` is set to `false`, the response contains the id
of the job reset task:
Comment on lines +74 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be good to indicate that to check on the status of the task, you shouldn't call the same API again and instead should use the tasks API.


[source,console-result]
----
{
"task": "oTUltX4IQMOUUVeiohTt8A:39"
}
----
// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/]
// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/]
If you want to check the status of the reset task, use the <<tasks>> by referencing
the task ID.


If you want to check the status of the reset task, use the <<tasks>> by referencing
the task ID.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"ml.reset_job":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-reset-job.html",
"description":"Resets an existing anomaly detection job."
},
"stability":"stable",
"visibility":"public",
"headers":{
"accept": [ "application/json"]
},
"url":{
"paths":[
{
"path":"/_ml/anomaly_detectors/{job_id}/_reset",
"methods":[
"POST"
],
"parts":{
"job_id":{
"type":"string",
"description":"The ID of the job to reset"
}
}
}
]
},
"params":{
"wait_for_completion":{
"type":"boolean",
"description":"Should this request wait until the operation has completed before returning",
"default":true
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class ResetJobAction extends ActionType<AcknowledgedResponse> {

public static final String NAME = "cluster:admin/xpack/ml/job/reset";
public static final ResetJobAction INSTANCE = new ResetJobAction();

public static final Version VERSION_INTRODUCED = Version.V_7_14_0;

private ResetJobAction() {
super(NAME, AcknowledgedResponse::readFrom);
}

public static class Request extends AcknowledgedRequest<Request> {

private String jobId;

/**
* Internal parameter that allows resetting an open job
* when a job is reallocated to a new node.
*/
private boolean skipJobStateValidation;

/**
* Should this task store its result?
*/
private boolean shouldStoreResult;

public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID);
}

public Request(StreamInput in) throws IOException {
super(in);
jobId = in.readString();
skipJobStateValidation = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeBoolean(skipJobStateValidation);
}

public void setSkipJobStateValidation(boolean skipJobStateValidation) {
this.skipJobStateValidation = skipJobStateValidation;
}

public boolean isSkipJobStateValidation() {
return skipJobStateValidation;
}

/**
* Should this task store its result after it has finished?
*/
public void setShouldStoreResult(boolean shouldStoreResult) {
this.shouldStoreResult = shouldStoreResult;
}

@Override
public boolean getShouldStoreResult() {
return shouldStoreResult;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, MlTasks.JOB_TASK_ID_PREFIX + jobId, parentTaskId, headers);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public String getJobId() {
return jobId;
}

@Override
public int hashCode() {
return Objects.hash(jobId, skipJobStateValidation);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || o.getClass() != getClass()) return false;
Request that = (Request) o;
return Objects.equals(jobId, that.jobId) && skipJobStateValidation == that.skipJobStateValidation;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.job.config;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Locale;
import java.util.Objects;

public class Blocked implements ToXContentObject, Writeable {

public enum Reason {
NONE, DELETE, RESET, REVERT;

public static Reason fromString(String value) {
return Reason.valueOf(value.toUpperCase(Locale.ROOT));
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

public static final ParseField REASON = new ParseField("reason");
public static final ParseField TASK_ID = new ParseField("task_id");

public static final ConstructingObjectParser<Blocked, Void> LENIENT_PARSER = createParser(true);
public static final ConstructingObjectParser<Blocked, Void> STRICT_PARSER = createParser(false);

private static ConstructingObjectParser<Blocked, Void> createParser(boolean ignoreUnknownFields) {
ConstructingObjectParser<Blocked, Void> parser = new ConstructingObjectParser<>("blocked", ignoreUnknownFields,
a -> new Blocked((Reason) a[0], (TaskId) a[1]));
parser.declareString(ConstructingObjectParser.constructorArg(), Reason::fromString, REASON);
parser.declareString(ConstructingObjectParser.optionalConstructorArg(), TaskId::new, TASK_ID);
return parser;
}

private final Reason reason;

@Nullable
private final TaskId taskId;

public Blocked(Reason reason, @Nullable TaskId taskId) {
this.reason = Objects.requireNonNull(reason);
this.taskId = taskId;
}

public Blocked(StreamInput in) throws IOException {
this.reason = in.readEnum(Reason.class);
this.taskId = in.readOptionalWriteable(TaskId::readFromStream);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(reason);
out.writeOptionalWriteable(taskId);
}

public Reason getReason() {
return reason;
}

@Nullable
public TaskId getTaskId() {
return taskId;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(REASON.getPreferredName(), reason);
if (taskId != null) {
builder.field(TASK_ID.getPreferredName(), taskId.toString());
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(reason, taskId);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Blocked that = (Blocked) o;
return Objects.equals(reason, that.reason) && Objects.equals(taskId, that.taskId);
}

public static Blocked none() {
return new Blocked(Reason.NONE, null);
}
}
Loading