Skip to content

[ML][Data Frame] responding with 409 status code when failing _stop #44231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class DataFrameMessages {
public static final String DATA_FRAME_FAILED_TO_PERSIST_STATS = "Failed to persist data frame statistics for transform [{0}]";
public static final String DATA_FRAME_UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";

public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM =
"Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." +
" Use force stop to stop the data frame transform.";
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
"Failed to load data frame transform configuration for transform [{0}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ protected void createContinuousPivotReviewsTransform(String transformId, String

String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
//Set frequency high for testing

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Set frequency high" - DYM low? The default is 60s, so 1s is lower?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High frequency => low time between repeats

(Similar to high frequency => short wavelength)

But if it's confusing then maybe bypass this by changing the comment to something like // Set frequency to 1s to make the test run as fast as possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of those pedantic things that annoy me; frequency is measured in Hz not seconds, this is an interval. However, most people will read this and understand that frequency represents the period here and is probably the least confusing naming option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A high frequency in Hz means a short distance between cycles.

A high frequency here means a short time between attempts.

+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\", \"frequency\": \"1s\"}},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM;

public class TransportStopDataFrameTransformAction extends
TransportTasksAction<DataFrameTransformTask, StopDataFrameTransformAction.Request,
StopDataFrameTransformAction.Response, StopDataFrameTransformAction.Response> {
Expand All @@ -63,6 +69,32 @@ public TransportStopDataFrameTransformAction(TransportService transportService,
this.client = client;
}

static void validateTaskState(ClusterState state, List<String> transformIds, boolean isForce) {
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (isForce == false && tasks != null) {
List<String> failedTasks = new ArrayList<>();
List<String> failedReasons = new ArrayList<>();
for (String transformId : transformIds) {
PersistentTasksCustomMetaData.PersistentTask<?> dfTask = tasks.getTask(transformId);
if (dfTask != null
&& dfTask.getState() instanceof DataFrameTransformState
&& ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) {
failedTasks.add(transformId);
failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason());
}
}
if (failedTasks.isEmpty() == false) {
String msg = failedTasks.size() == 1 ?
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
failedTasks.get(0),
failedReasons.get(0)) :
"Unable to stop data frame transforms. The following transforms are in a failed state " +
failedTasks + " with reasons " + failedReasons + ". Use force stop to stop the data frame transforms.";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use a constant

throw new ElasticsearchStatusException(msg, RestStatus.CONFLICT);
}
}
}

@Override
protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
Expand All @@ -88,8 +120,9 @@ protected void doExecute(Task task, StopDataFrameTransformAction.Request request
new PageParams(0, 10_000),
request.isAllowNoMatch(),
ActionListener.wrap(hitsAndIds -> {
validateTaskState(state, hitsAndIds.v2(), request.isForce());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: taskstate -> taskstates (or is it tasksstates?)

request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state()));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state));
super.doExecute(task, request, finalListener);
},
listener::onFailure
Expand All @@ -108,11 +141,14 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF
}

if (ids.contains(transformTask.getTransformId())) {
// This should not occur as we validate that none of the tasks are in a failed state earlier
// Keep this check in here for insurance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(
new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId()
+ "] as it is in a failed state with reason: [" + transformTask.getState().getReason() +
"]. Use force stop to stop the data frame transform.",
new ElasticsearchStatusException(
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
request.getId(),
transformTask.getState().getReason()),
RestStatus.CONFLICT));
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;

import java.util.Arrays;
import java.util.Collections;

import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.hamcrest.Matchers.equalTo;

public class TransportStopDataFrameTransformActionTests extends ESTestCase {

private MetaData.Builder buildMetadata(PersistentTasksCustomMetaData ptasks) {
return MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, ptasks);
}

public void testTaskStateValidationWithNoTasks() {
MetaData.Builder metaData = MetaData.builder();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(metaData);
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);

PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder();
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
}

public void testTaskStateValidationWithDataFrameTasks() {
// Test with the task state being null
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
.addTask("non-failed-task",
DataFrameTransform.NAME,
new DataFrameTransform("data-frame-task-1", Version.CURRENT, null),
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));

TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);

// test again with a non failed task but this time it has internal state
pTasksBuilder.updateTaskState("non-failed-task", new DataFrameTransformState(DataFrameTransformTaskState.STOPPED,
IndexerState.STOPPED,
null,
0L,
null,
null));
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));

TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);

pTasksBuilder.addTask("failed-task",
DataFrameTransform.NAME,
new DataFrameTransform("data-frame-task-1", Version.CURRENT, null),
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""))
.updateTaskState("failed-task", new DataFrameTransformState(DataFrameTransformTaskState.FAILED,
IndexerState.STOPPED,
null,
0L,
"task has failed",
null));
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));

TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Arrays.asList("non-failed-task", "failed-task"), true);

TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);

ClusterState.Builder csBuilderFinal = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class,
() -> TransportStopDataFrameTransformAction.validateTaskState(csBuilderFinal.build(),
Collections.singletonList("failed-task"),
false));

assertThat(ex.status(), equalTo(CONFLICT));
assertThat(ex.getMessage(),
equalTo(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
"failed-task",
"task has failed")));
}

}