Skip to content

Commit 0a659e7

Browse files
authored
[ML][Data Frame] responding with 409 status code when failing _stop (#44231) (#44277)
* [ML][Data Frame] responding with appropriate status code when failing _stop * adding null checks for persistent task data * addressing PR comments
1 parent cecb5f9 commit 0a659e7

File tree

4 files changed

+139
-5
lines changed

4 files changed

+139
-5
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ public class DataFrameMessages {
3232
public static final String DATA_FRAME_FAILED_TO_PERSIST_STATS = "Failed to persist data frame statistics for transform [{0}]";
3333
public static final String DATA_FRAME_UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
3434

35+
public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM =
36+
"Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." +
37+
" Use force stop to stop the data frame transform.";
3538
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
3639
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
3740
"Failed to load data frame transform configuration for transform [{0}]";

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
161161

162162
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
163163
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
164-
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
164+
//Set frequency high for testing
165+
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\", \"frequency\": \"1s\"}},"
165166
+ " \"pivot\": {"
166167
+ " \"group_by\": {"
167168
+ " \"reviewer\": {"

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java

+40-4
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,29 @@
2121
import org.elasticsearch.common.inject.Inject;
2222
import org.elasticsearch.common.unit.TimeValue;
2323
import org.elasticsearch.discovery.MasterNotDiscoveredException;
24+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2425
import org.elasticsearch.persistent.PersistentTasksService;
2526
import org.elasticsearch.rest.RestStatus;
2627
import org.elasticsearch.tasks.Task;
2728
import org.elasticsearch.threadpool.ThreadPool;
2829
import org.elasticsearch.transport.TransportService;
2930
import org.elasticsearch.xpack.core.action.util.PageParams;
31+
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
3032
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
33+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
3134
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
3235
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
3336
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
3437
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
3538

39+
import java.util.ArrayList;
3640
import java.util.Collection;
3741
import java.util.HashSet;
3842
import java.util.List;
3943
import java.util.Set;
4044

45+
import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM;
46+
4147
public class TransportStopDataFrameTransformAction extends
4248
TransportTasksAction<DataFrameTransformTask, StopDataFrameTransformAction.Request,
4349
StopDataFrameTransformAction.Response, StopDataFrameTransformAction.Response> {
@@ -63,6 +69,32 @@ public TransportStopDataFrameTransformAction(TransportService transportService,
6369
this.client = client;
6470
}
6571

72+
static void validateTaskState(ClusterState state, List<String> transformIds, boolean isForce) {
73+
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
74+
if (isForce == false && tasks != null) {
75+
List<String> failedTasks = new ArrayList<>();
76+
List<String> failedReasons = new ArrayList<>();
77+
for (String transformId : transformIds) {
78+
PersistentTasksCustomMetaData.PersistentTask<?> dfTask = tasks.getTask(transformId);
79+
if (dfTask != null
80+
&& dfTask.getState() instanceof DataFrameTransformState
81+
&& ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) {
82+
failedTasks.add(transformId);
83+
failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason());
84+
}
85+
}
86+
if (failedTasks.isEmpty() == false) {
87+
String msg = failedTasks.size() == 1 ?
88+
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
89+
failedTasks.get(0),
90+
failedReasons.get(0)) :
91+
"Unable to stop data frame transforms. The following transforms are in a failed state " +
92+
failedTasks + " with reasons " + failedReasons + ". Use force stop to stop the data frame transforms.";
93+
throw new ElasticsearchStatusException(msg, RestStatus.CONFLICT);
94+
}
95+
}
96+
}
97+
6698
@Override
6799
protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
68100
ActionListener<StopDataFrameTransformAction.Response> listener) {
@@ -88,8 +120,9 @@ protected void doExecute(Task task, StopDataFrameTransformAction.Request request
88120
new PageParams(0, 10_000),
89121
request.isAllowNoMatch(),
90122
ActionListener.wrap(hitsAndIds -> {
123+
validateTaskState(state, hitsAndIds.v2(), request.isForce());
91124
request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
92-
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state()));
125+
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state));
93126
super.doExecute(task, request, finalListener);
94127
},
95128
listener::onFailure
@@ -108,11 +141,14 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF
108141
}
109142

110143
if (ids.contains(transformTask.getTransformId())) {
144+
// This should not occur as we validate that none of the tasks are in a failed state earlier
145+
// Keep this check in here for insurance.
111146
if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
112147
listener.onFailure(
113-
new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId()
114-
+ "] as it is in a failed state with reason: [" + transformTask.getState().getReason() +
115-
"]. Use force stop to stop the data frame transform.",
148+
new ElasticsearchStatusException(
149+
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
150+
request.getId(),
151+
transformTask.getState().getReason()),
116152
RestStatus.CONFLICT));
117153
return;
118154
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.dataframe.action;
7+
8+
import org.elasticsearch.ElasticsearchStatusException;
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.cluster.ClusterName;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.MetaData;
13+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
14+
import org.elasticsearch.test.ESTestCase;
15+
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
16+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
17+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
18+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
19+
import org.elasticsearch.xpack.core.indexing.IndexerState;
20+
21+
import java.util.Arrays;
22+
import java.util.Collections;
23+
24+
import static org.elasticsearch.rest.RestStatus.CONFLICT;
25+
import static org.hamcrest.Matchers.equalTo;
26+
27+
public class TransportStopDataFrameTransformActionTests extends ESTestCase {
28+
29+
private MetaData.Builder buildMetadata(PersistentTasksCustomMetaData ptasks) {
30+
return MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, ptasks);
31+
}
32+
33+
public void testTaskStateValidationWithNoTasks() {
34+
MetaData.Builder metaData = MetaData.builder();
35+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(metaData);
36+
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
37+
38+
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder();
39+
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
40+
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
41+
}
42+
43+
public void testTaskStateValidationWithDataFrameTasks() {
44+
// Test with the task state being null
45+
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
46+
.addTask("non-failed-task",
47+
DataFrameTransform.NAME,
48+
new DataFrameTransform("data-frame-task-1", Version.CURRENT, null),
49+
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""));
50+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
51+
52+
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
53+
54+
// test again with a non failed task but this time it has internal state
55+
pTasksBuilder.updateTaskState("non-failed-task", new DataFrameTransformState(DataFrameTransformTaskState.STOPPED,
56+
IndexerState.STOPPED,
57+
null,
58+
0L,
59+
null,
60+
null));
61+
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
62+
63+
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
64+
65+
pTasksBuilder.addTask("failed-task",
66+
DataFrameTransform.NAME,
67+
new DataFrameTransform("data-frame-task-1", Version.CURRENT, null),
68+
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""))
69+
.updateTaskState("failed-task", new DataFrameTransformState(DataFrameTransformTaskState.FAILED,
70+
IndexerState.STOPPED,
71+
null,
72+
0L,
73+
"task has failed",
74+
null));
75+
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
76+
77+
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Arrays.asList("non-failed-task", "failed-task"), true);
78+
79+
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
80+
81+
ClusterState.Builder csBuilderFinal = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
82+
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class,
83+
() -> TransportStopDataFrameTransformAction.validateTaskState(csBuilderFinal.build(),
84+
Collections.singletonList("failed-task"),
85+
false));
86+
87+
assertThat(ex.status(), equalTo(CONFLICT));
88+
assertThat(ex.getMessage(),
89+
equalTo(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
90+
"failed-task",
91+
"task has failed")));
92+
}
93+
94+
}

0 commit comments

Comments
 (0)