Skip to content

Commit bff8840

Browse files
authored
[ML] make more GET and heavier processing APIs cancellable (elastic#88142)
This commit makes the following APIs cancellable: get datafeeds get jobs evaluate data frames explain data frames get job model snapshot preview datafeed preview data frame get categories (this can be expensive due to grok parsing)
1 parent 7234730 commit bff8840

File tree

55 files changed

+521
-143
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+521
-143
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/EvaluateDataFrameAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import org.elasticsearch.common.xcontent.XContentParserUtils;
1717
import org.elasticsearch.core.Nullable;
1818
import org.elasticsearch.index.query.QueryBuilder;
19+
import org.elasticsearch.tasks.CancellableTask;
20+
import org.elasticsearch.tasks.Task;
21+
import org.elasticsearch.tasks.TaskId;
1922
import org.elasticsearch.xcontent.ConstructingObjectParser;
2023
import org.elasticsearch.xcontent.ParseField;
2124
import org.elasticsearch.xcontent.ToXContentObject;
@@ -30,6 +33,7 @@
3033
import java.io.IOException;
3134
import java.util.Arrays;
3235
import java.util.List;
36+
import java.util.Map;
3337
import java.util.Objects;
3438
import java.util.Optional;
3539

@@ -175,6 +179,11 @@ public boolean equals(Object o) {
175179
&& Objects.equals(queryProvider, that.queryProvider)
176180
&& Objects.equals(evaluation, that.evaluation);
177181
}
182+
183+
@Override
184+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
185+
return new CancellableTask(id, type, action, "evaluate_data_frame", parentTaskId, headers);
186+
}
178187
}
179188

180189
public static class Response extends ActionResponse implements ToXContentObject {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ExplainDataFrameAnalyticsAction.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,36 @@
66
*/
77
package org.elasticsearch.xpack.core.ml.action;
88

9+
import org.elasticsearch.action.ActionRequestValidationException;
910
import org.elasticsearch.action.ActionResponse;
1011
import org.elasticsearch.action.ActionType;
12+
import org.elasticsearch.action.ValidateActions;
13+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1114
import org.elasticsearch.common.io.stream.StreamInput;
1215
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.tasks.CancellableTask;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.tasks.TaskId;
1319
import org.elasticsearch.xcontent.ConstructingObjectParser;
1420
import org.elasticsearch.xcontent.ParseField;
1521
import org.elasticsearch.xcontent.ToXContentObject;
1622
import org.elasticsearch.xcontent.XContentBuilder;
23+
import org.elasticsearch.xcontent.XContentParser;
24+
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
25+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
26+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
1727
import org.elasticsearch.xpack.core.ml.dataframe.explain.FieldSelection;
1828
import org.elasticsearch.xpack.core.ml.dataframe.explain.MemoryEstimation;
29+
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
30+
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
1931

2032
import java.io.IOException;
2133
import java.util.List;
34+
import java.util.Map;
2235
import java.util.Objects;
2336

37+
import static org.elasticsearch.core.Strings.format;
38+
2439
public class ExplainDataFrameAnalyticsAction extends ActionType<ExplainDataFrameAnalyticsAction.Response> {
2540

2641
public static final ExplainDataFrameAnalyticsAction INSTANCE = new ExplainDataFrameAnalyticsAction();
@@ -30,6 +45,118 @@ private ExplainDataFrameAnalyticsAction() {
3045
super(NAME, ExplainDataFrameAnalyticsAction.Response::new);
3146
}
3247

48+
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
49+
public static Request parseRequest(XContentParser parser) {
50+
DataFrameAnalyticsConfig.Builder configBuilder = DataFrameAnalyticsConfig.STRICT_PARSER.apply(parser, null);
51+
DataFrameAnalyticsConfig config = configBuilder.buildForExplain();
52+
return new Request(config);
53+
}
54+
55+
private final DataFrameAnalyticsConfig config;
56+
57+
public Request(StreamInput in) throws IOException {
58+
super(in);
59+
config = new DataFrameAnalyticsConfig(in);
60+
}
61+
62+
public Request(DataFrameAnalyticsConfig config) {
63+
this.config = config;
64+
}
65+
66+
@Override
67+
public void writeTo(StreamOutput out) throws IOException {
68+
super.writeTo(out);
69+
config.writeTo(out);
70+
}
71+
72+
public DataFrameAnalyticsConfig getConfig() {
73+
return config;
74+
}
75+
76+
@Override
77+
public ActionRequestValidationException validate() {
78+
ActionRequestValidationException error = null;
79+
error = checkConfigIdIsValid(config, error);
80+
error = SourceDestValidator.validateRequest(error, config.getDest().getIndex());
81+
error = checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(config, error);
82+
return error;
83+
}
84+
85+
private ActionRequestValidationException checkConfigIdIsValid(
86+
DataFrameAnalyticsConfig analyticsConfig,
87+
ActionRequestValidationException error
88+
) {
89+
if (MlStrings.isValidId(analyticsConfig.getId()) == false) {
90+
error = ValidateActions.addValidationError(
91+
Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID, analyticsConfig.getId()),
92+
error
93+
);
94+
}
95+
if (MlStrings.hasValidLengthForId(analyticsConfig.getId()) == false) {
96+
error = ValidateActions.addValidationError(
97+
Messages.getMessage(
98+
Messages.ID_TOO_LONG,
99+
DataFrameAnalyticsConfig.ID,
100+
analyticsConfig.getId(),
101+
MlStrings.ID_LENGTH_LIMIT
102+
),
103+
error
104+
);
105+
}
106+
return error;
107+
}
108+
109+
private ActionRequestValidationException checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(
110+
DataFrameAnalyticsConfig analyticsConfig,
111+
ActionRequestValidationException error
112+
) {
113+
if (analyticsConfig.getAnalyzedFields() == null) {
114+
return error;
115+
}
116+
for (String analyzedInclude : analyticsConfig.getAnalyzedFields().includes()) {
117+
if (analyticsConfig.getSource().isFieldExcluded(analyzedInclude)) {
118+
return ValidateActions.addValidationError(
119+
"field ["
120+
+ analyzedInclude
121+
+ "] is included in ["
122+
+ DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName()
123+
+ "] but not in ["
124+
+ DataFrameAnalyticsConfig.SOURCE.getPreferredName()
125+
+ "."
126+
+ DataFrameAnalyticsSource._SOURCE.getPreferredName()
127+
+ "]",
128+
error
129+
);
130+
}
131+
}
132+
return error;
133+
}
134+
135+
@Override
136+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
137+
config.toXContent(builder, params);
138+
return builder;
139+
}
140+
141+
@Override
142+
public boolean equals(Object o) {
143+
if (this == o) return true;
144+
if (o == null || getClass() != o.getClass()) return false;
145+
Request request = (Request) o;
146+
return Objects.equals(config, request.config);
147+
}
148+
149+
@Override
150+
public int hashCode() {
151+
return Objects.hash(config);
152+
}
153+
154+
@Override
155+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
156+
return new CancellableTask(id, type, action, format("explain_data_frame_analytics[%s]", config.getId()), parentTaskId, headers);
157+
}
158+
}
159+
33160
public static class Response extends ActionResponse implements ToXContentObject {
34161

35162
public static final ParseField TYPE = new ParseField("explain_data_frame_analytics_response");

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import org.elasticsearch.action.ActionType;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.tasks.CancellableTask;
15+
import org.elasticsearch.tasks.Task;
16+
import org.elasticsearch.tasks.TaskId;
1417
import org.elasticsearch.xcontent.ObjectParser;
1518
import org.elasticsearch.xcontent.ParseField;
1619
import org.elasticsearch.xcontent.ToXContentObject;
@@ -24,9 +27,11 @@
2427
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2528

2629
import java.io.IOException;
30+
import java.util.Map;
2731
import java.util.Objects;
2832

2933
import static org.elasticsearch.action.ValidateActions.addValidationError;
34+
import static org.elasticsearch.core.Strings.format;
3035

3136
public class GetCategoriesAction extends ActionType<GetCategoriesAction.Response> {
3237

@@ -190,6 +195,11 @@ public boolean equals(Object o) {
190195
public int hashCode() {
191196
return Objects.hash(jobId, categoryId, pageParams, partitionFieldValue);
192197
}
198+
199+
@Override
200+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
201+
return new CancellableTask(id, type, action, format("get_categories[%s:%s]", jobId, categoryId), parentTaskId, headers);
202+
}
193203
}
194204

195205
public static class Response extends AbstractGetResourcesResponse<CategoryDefinition> implements ToXContentObject {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsAction.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,21 @@
1111
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.tasks.CancellableTask;
15+
import org.elasticsearch.tasks.Task;
16+
import org.elasticsearch.tasks.TaskId;
1417
import org.elasticsearch.xcontent.ToXContentObject;
1518
import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse;
1619
import org.elasticsearch.xpack.core.action.util.QueryPage;
1720
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
1821
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
1922

2023
import java.io.IOException;
24+
import java.util.Map;
2125
import java.util.Objects;
2226

27+
import static org.elasticsearch.core.Strings.format;
28+
2329
public class GetDatafeedsAction extends ActionType<GetDatafeedsAction.Response> {
2430

2531
public static final GetDatafeedsAction INSTANCE = new GetDatafeedsAction();
@@ -93,6 +99,11 @@ public boolean equals(Object obj) {
9399
Request other = (Request) obj;
94100
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(allowNoMatch, other.allowNoMatch);
95101
}
102+
103+
@Override
104+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
105+
return new CancellableTask(id, type, action, format("get_datafeeds[%s]", datafeedId), parentTaskId, headers);
106+
}
96107
}
97108

98109
public static class Response extends AbstractGetResourcesResponse<DatafeedConfig> implements ToXContentObject {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsAction.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,21 @@
1111
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.tasks.CancellableTask;
15+
import org.elasticsearch.tasks.Task;
16+
import org.elasticsearch.tasks.TaskId;
1417
import org.elasticsearch.xcontent.ToXContentObject;
1518
import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse;
1619
import org.elasticsearch.xpack.core.action.util.QueryPage;
1720
import org.elasticsearch.xpack.core.ml.job.config.Job;
1821
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
1922

2023
import java.io.IOException;
24+
import java.util.Map;
2125
import java.util.Objects;
2226

27+
import static org.elasticsearch.core.Strings.format;
28+
2329
public class GetJobsAction extends ActionType<GetJobsAction.Response> {
2430

2531
public static final GetJobsAction INSTANCE = new GetJobsAction();
@@ -91,6 +97,11 @@ public boolean equals(Object obj) {
9197
Request other = (Request) obj;
9298
return Objects.equals(jobId, other.jobId) && Objects.equals(allowNoMatch, other.allowNoMatch);
9399
}
100+
101+
@Override
102+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
103+
return new CancellableTask(id, type, action, format("get_anomaly_detection_jobs[%s]", jobId), parentTaskId, headers);
104+
}
94105
}
95106

96107
public static class Response extends AbstractGetResourcesResponse<Job> implements ToXContentObject {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetModelSnapshotsAction.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
1414
import org.elasticsearch.core.Nullable;
15+
import org.elasticsearch.tasks.CancellableTask;
16+
import org.elasticsearch.tasks.Task;
17+
import org.elasticsearch.tasks.TaskId;
1518
import org.elasticsearch.xcontent.ObjectParser;
1619
import org.elasticsearch.xcontent.ParseField;
1720
import org.elasticsearch.xcontent.ToXContentObject;
@@ -25,8 +28,11 @@
2528
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2629

2730
import java.io.IOException;
31+
import java.util.Map;
2832
import java.util.Objects;
2933

34+
import static org.elasticsearch.core.Strings.format;
35+
3036
public class GetModelSnapshotsAction extends ActionType<GetModelSnapshotsAction.Response> {
3137

3238
public static final GetModelSnapshotsAction INSTANCE = new GetModelSnapshotsAction();
@@ -205,6 +211,11 @@ public boolean equals(Object obj) {
205211
&& Objects.equals(sort, other.sort)
206212
&& Objects.equals(desc, other.desc);
207213
}
214+
215+
@Override
216+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
217+
return new CancellableTask(id, type, action, format("get_job_model_snapshot[%s:%s]", jobId, snapshotId), parentTaskId, headers);
218+
}
208219
}
209220

210221
public static class Response extends AbstractGetResourcesResponse<ModelSnapshot> implements ToXContentObject {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsAction.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import org.elasticsearch.action.ActionType;
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.tasks.CancellableTask;
16+
import org.elasticsearch.tasks.Task;
17+
import org.elasticsearch.tasks.TaskId;
1518
import org.elasticsearch.xcontent.ConstructingObjectParser;
1619
import org.elasticsearch.xcontent.ObjectParser;
1720
import org.elasticsearch.xcontent.ParseField;
@@ -26,6 +29,8 @@
2629
import java.util.Map;
2730
import java.util.Objects;
2831

32+
import static org.elasticsearch.core.Strings.format;
33+
2934
public class PreviewDataFrameAnalyticsAction extends ActionType<PreviewDataFrameAnalyticsAction.Response> {
3035

3136
public static final PreviewDataFrameAnalyticsAction INSTANCE = new PreviewDataFrameAnalyticsAction();
@@ -87,6 +92,11 @@ public int hashCode() {
8792
return Objects.hash(config);
8893
}
8994

95+
@Override
96+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
97+
return new CancellableTask(id, type, action, format("preview_data_frame_analytics[%s]", config.getId()), parentTaskId, headers);
98+
}
99+
90100
public static class Builder {
91101
private DataFrameAnalyticsConfig config;
92102

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
import org.elasticsearch.common.io.stream.StreamInput;
1818
import org.elasticsearch.common.io.stream.StreamOutput;
1919
import org.elasticsearch.core.Nullable;
20+
import org.elasticsearch.tasks.CancellableTask;
21+
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.tasks.TaskId;
2023
import org.elasticsearch.xcontent.ObjectParser;
2124
import org.elasticsearch.xcontent.ParseField;
2225
import org.elasticsearch.xcontent.ToXContentObject;
@@ -29,9 +32,11 @@
2932

3033
import java.io.IOException;
3134
import java.io.InputStream;
35+
import java.util.Map;
3236
import java.util.Objects;
3337
import java.util.OptionalLong;
3438

39+
import static org.elasticsearch.core.Strings.format;
3540
import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.DatafeedParams.parseDateOrThrow;
3641
import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.END_TIME;
3742
import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.START_TIME;
@@ -199,6 +204,11 @@ public boolean equals(Object obj) {
199204
&& Objects.equals(jobConfig, other.jobConfig);
200205
}
201206

207+
@Override
208+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
209+
return new CancellableTask(id, type, action, format("preview_datafeed[%s]", datafeedId), parentTaskId, headers);
210+
}
211+
202212
public static class Builder {
203213
private String datafeedId;
204214
private DatafeedConfig.Builder datafeedBuilder;

0 commit comments

Comments
 (0)