Skip to content

Commit 92f7c62

Browse files
[7.x][ML] Reset anomaly detection job API (#73908) (#74093)
Adds a new API that allows a user to reset an anomaly detection job. To use the API do: ``` POST _ml/anomaly_detectors/<job_id>_reset ``` The API removes all data associated to the job. In particular, it deletes model state, results and stats. However, job notifications and user annotations are not removed. Also, the API can be called asynchronously by setting the parameter `wait_for_completion` to `false` (defaults to `true`). When run that way the API returns the task id for further monitoring. In order to prevent the job from opening while it is resetting, a new job field has been added called `blocked`. It is an object that contains a `reason` and the `task_id`. `reason` can take a value from ["delete", "reset", "revert"] as all these operations should block the job from opening. The `task_id` is also included in order to allow tracking the task if necessary. Finally, this commit also sets the `blocked` field when the revert snapshot API is called as a job should not be opened while it is reverted to a different model snapshot. Backport of #73908
1 parent a45598a commit 92f7c62

File tree

37 files changed

+1358
-174
lines changed

37 files changed

+1358
-174
lines changed

docs/reference/ml/anomaly-detection/apis/get-job.asciidoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=exclude-generated]
5959
The API returns an array of {anomaly-job} resources. For the full list of
6060
properties, see <<ml-put-job-request-body,create {anomaly-jobs} API>>.
6161

62+
//Begin blocked
63+
`blocked`::
64+
(object) When present, it explains that a task is executed on the job
65+
that blocks it from opening.
66+
+
67+
.Properties of `blocked`
68+
[%collapsible%open]
69+
====
70+
`reason`:::
71+
(string) The reason the job is blocked. Values may be `delete`, `reset`, `revert`.
72+
Each value means the corresponding action is being executed.
73+
74+
`task_id`:::
75+
(string) The task id of the blocking action. You can use the <<tasks>> API to
76+
monitor progress.
77+
====
78+
//End blocked
79+
6280
`create_time`::
6381
(string) The time the job was created. For example, `1491007356077`. This
6482
property is informational; you cannot change its value.

docs/reference/ml/anomaly-detection/apis/index.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ include::open-job.asciidoc[leveloffset=+2]
4848
include::post-data.asciidoc[leveloffset=+2]
4949
//PREVIEW
5050
include::preview-datafeed.asciidoc[leveloffset=+2]
51+
//RESET
52+
include::reset-job.asciidoc[leveloffset=+2]
5153
//REVERT
5254
include::revert-snapshot.asciidoc[leveloffset=+2]
5355
//SET/START/STOP

docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ See also <<ml-df-analytics-apis>>.
1919
* <<ml-post-data,Post data to {anomaly-jobs}>>
2020
* <<ml-update-job,Update {anomaly-jobs}>>
2121
* <<ml-forecast,Create>> or <<ml-delete-forecast,delete forecasts>>
22+
* <<ml-reset-job,Reset {anomaly-jobs}>>
2223

2324

2425
[discrete]
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
[role="xpack"]
2+
[testenv="platinum"]
3+
[[ml-reset-job]]
4+
= Reset {anomaly-jobs} API
5+
++++
6+
<titleabbrev>Reset jobs</titleabbrev>
7+
++++
8+
9+
Resets an existing {anomaly-job}.
10+
11+
[[ml-reset-job-request]]
12+
== {api-request-title}
13+
14+
`POST _ml/anomaly_detectors/<job_id>/_reset`
15+
16+
[[ml-reset-job-prereqs]]
17+
== {api-prereq-title}
18+
19+
* Requires the `manage_ml` cluster privilege. This privilege is included in the
20+
`machine_learning_admin` built-in role.
21+
* Before you can reset a job, you must close it. You can set `force` to `true`
22+
when closing the job to avoid waiting for the job to complete. See
23+
<<ml-close-job>>.
24+
25+
[[ml-reset-job-desc]]
26+
== {api-description-title}
27+
28+
All model state and results are deleted.
29+
The job is ready to start over as if it had just been created.
30+
31+
It is not currently possible to reset multiple jobs using wildcards or a comma
32+
separated list.
33+
34+
[[ml-reset-job-path-parms]]
35+
== {api-path-parms-title}
36+
37+
`<job_id>`::
38+
(Required, string)
39+
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection]
40+
41+
[[ml-reset-job-query-parms]]
42+
== {api-query-parms-title}
43+
44+
`wait_for_completion`::
45+
(Optional, Boolean) Specifies whether the request should return immediately or
46+
wait until the job reset completes. Defaults to `true`.
47+
48+
[[ml-reset-job-example]]
49+
== {api-examples-title}
50+
51+
[source,console]
52+
--------------------------------------------------
53+
POST _ml/anomaly_detectors/total-requests/_reset
54+
--------------------------------------------------
55+
// TEST[skip:setup:server_metrics_job]
56+
57+
When the job is reset, you receive the following results:
58+
59+
[source,console-result]
60+
----
61+
{
62+
"acknowledged": true
63+
}
64+
----
65+
66+
In the next example we reset the `total-requests` job asynchronously:
67+
68+
[source,console]
69+
--------------------------------------------------
70+
POST _ml/anomaly_detectors/total-requests/_reset?wait_for_completion=false
71+
--------------------------------------------------
72+
// TEST[skip:setup:server_metrics_job]
73+
74+
When `wait_for_completion` is set to `false`, the response contains the id
75+
of the job reset task:
76+
77+
[source,console-result]
78+
----
79+
{
80+
"task": "oTUltX4IQMOUUVeiohTt8A:39"
81+
}
82+
----
83+
// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/]
84+
85+
If you want to check the status of the reset task, use the <<tasks>> by referencing
86+
the task ID.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"ml.reset_job":{
3+
"documentation":{
4+
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-reset-job.html",
5+
"description":"Resets an existing anomaly detection job."
6+
},
7+
"stability":"stable",
8+
"visibility":"public",
9+
"headers":{
10+
"accept": [ "application/json"]
11+
},
12+
"url":{
13+
"paths":[
14+
{
15+
"path":"/_ml/anomaly_detectors/{job_id}/_reset",
16+
"methods":[
17+
"POST"
18+
],
19+
"parts":{
20+
"job_id":{
21+
"type":"string",
22+
"description":"The ID of the job to reset"
23+
}
24+
}
25+
}
26+
]
27+
},
28+
"params":{
29+
"wait_for_completion":{
30+
"type":"boolean",
31+
"description":"Should this request wait until the operation has completed before returning",
32+
"default":true
33+
}
34+
}
35+
}
36+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.action;
9+
10+
import org.elasticsearch.Version;
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
14+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
20+
import org.elasticsearch.xpack.core.ml.MlTasks;
21+
import org.elasticsearch.xpack.core.ml.job.config.Job;
22+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
23+
24+
import java.io.IOException;
25+
import java.util.Map;
26+
import java.util.Objects;
27+
28+
public class ResetJobAction extends ActionType<AcknowledgedResponse> {
29+
30+
public static final String NAME = "cluster:admin/xpack/ml/job/reset";
31+
public static final ResetJobAction INSTANCE = new ResetJobAction();
32+
33+
public static final Version VERSION_INTRODUCED = Version.V_7_14_0;
34+
35+
private ResetJobAction() {
36+
super(NAME, AcknowledgedResponse::readFrom);
37+
}
38+
39+
public static class Request extends AcknowledgedRequest<Request> {
40+
41+
private String jobId;
42+
43+
/**
44+
* Internal parameter that allows resetting an open job
45+
* when a job is reallocated to a new node.
46+
*/
47+
private boolean skipJobStateValidation;
48+
49+
/**
50+
* Should this task store its result?
51+
*/
52+
private boolean shouldStoreResult;
53+
54+
public Request(String jobId) {
55+
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID);
56+
}
57+
58+
public Request(StreamInput in) throws IOException {
59+
super(in);
60+
jobId = in.readString();
61+
skipJobStateValidation = in.readBoolean();
62+
}
63+
64+
@Override
65+
public void writeTo(StreamOutput out) throws IOException {
66+
super.writeTo(out);
67+
out.writeString(jobId);
68+
out.writeBoolean(skipJobStateValidation);
69+
}
70+
71+
public void setSkipJobStateValidation(boolean skipJobStateValidation) {
72+
this.skipJobStateValidation = skipJobStateValidation;
73+
}
74+
75+
public boolean isSkipJobStateValidation() {
76+
return skipJobStateValidation;
77+
}
78+
79+
/**
80+
* Should this task store its result after it has finished?
81+
*/
82+
public void setShouldStoreResult(boolean shouldStoreResult) {
83+
this.shouldStoreResult = shouldStoreResult;
84+
}
85+
86+
@Override
87+
public boolean getShouldStoreResult() {
88+
return shouldStoreResult;
89+
}
90+
91+
@Override
92+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
93+
return new CancellableTask(id, type, action, MlTasks.JOB_TASK_ID_PREFIX + jobId, parentTaskId, headers);
94+
}
95+
96+
@Override
97+
public ActionRequestValidationException validate() {
98+
return null;
99+
}
100+
101+
public String getJobId() {
102+
return jobId;
103+
}
104+
105+
@Override
106+
public int hashCode() {
107+
return Objects.hash(jobId, skipJobStateValidation);
108+
}
109+
110+
@Override
111+
public boolean equals(Object o) {
112+
if (this == o) return true;
113+
if (o == null || o.getClass() != getClass()) return false;
114+
Request that = (Request) o;
115+
return Objects.equals(jobId, that.jobId) && skipJobStateValidation == that.skipJobStateValidation;
116+
}
117+
}
118+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.job.config;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.io.stream.Writeable;
13+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
14+
import org.elasticsearch.common.xcontent.ParseField;
15+
import org.elasticsearch.common.xcontent.ToXContentObject;
16+
import org.elasticsearch.common.xcontent.XContentBuilder;
17+
import org.elasticsearch.core.Nullable;
18+
import org.elasticsearch.tasks.TaskId;
19+
20+
import java.io.IOException;
21+
import java.util.Locale;
22+
import java.util.Objects;
23+
24+
public class Blocked implements ToXContentObject, Writeable {
25+
26+
public enum Reason {
27+
NONE, DELETE, RESET, REVERT;
28+
29+
public static Reason fromString(String value) {
30+
return Reason.valueOf(value.toUpperCase(Locale.ROOT));
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return name().toLowerCase(Locale.ROOT);
36+
}
37+
}
38+
39+
public static final ParseField REASON = new ParseField("reason");
40+
public static final ParseField TASK_ID = new ParseField("task_id");
41+
42+
public static final ConstructingObjectParser<Blocked, Void> LENIENT_PARSER = createParser(true);
43+
public static final ConstructingObjectParser<Blocked, Void> STRICT_PARSER = createParser(false);
44+
45+
private static ConstructingObjectParser<Blocked, Void> createParser(boolean ignoreUnknownFields) {
46+
ConstructingObjectParser<Blocked, Void> parser = new ConstructingObjectParser<>("blocked", ignoreUnknownFields,
47+
a -> new Blocked((Reason) a[0], (TaskId) a[1]));
48+
parser.declareString(ConstructingObjectParser.constructorArg(), Reason::fromString, REASON);
49+
parser.declareString(ConstructingObjectParser.optionalConstructorArg(), TaskId::new, TASK_ID);
50+
return parser;
51+
}
52+
53+
private final Reason reason;
54+
55+
@Nullable
56+
private final TaskId taskId;
57+
58+
public Blocked(Reason reason, @Nullable TaskId taskId) {
59+
this.reason = Objects.requireNonNull(reason);
60+
this.taskId = taskId;
61+
}
62+
63+
public Blocked(StreamInput in) throws IOException {
64+
this.reason = in.readEnum(Reason.class);
65+
this.taskId = in.readOptionalWriteable(TaskId::readFromStream);
66+
}
67+
68+
@Override
69+
public void writeTo(StreamOutput out) throws IOException {
70+
out.writeEnum(reason);
71+
out.writeOptionalWriteable(taskId);
72+
}
73+
74+
public Reason getReason() {
75+
return reason;
76+
}
77+
78+
@Nullable
79+
public TaskId getTaskId() {
80+
return taskId;
81+
}
82+
83+
@Override
84+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
85+
builder.startObject();
86+
builder.field(REASON.getPreferredName(), reason);
87+
if (taskId != null) {
88+
builder.field(TASK_ID.getPreferredName(), taskId.toString());
89+
}
90+
builder.endObject();
91+
return builder;
92+
}
93+
94+
@Override
95+
public int hashCode() {
96+
return Objects.hash(reason, taskId);
97+
}
98+
99+
@Override
100+
public boolean equals(Object o) {
101+
if (this == o) return true;
102+
if (o == null || getClass() != o.getClass()) return false;
103+
104+
Blocked that = (Blocked) o;
105+
return Objects.equals(reason, that.reason) && Objects.equals(taskId, that.taskId);
106+
}
107+
108+
public static Blocked none() {
109+
return new Blocked(Reason.NONE, null);
110+
}
111+
}

0 commit comments

Comments
 (0)