Skip to content

[ML][Data Frame] responding with 409 status code when failing _stop #44231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ protected void createContinuousPivotReviewsTransform(String transformId, String

String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
//Set frequency high for testing

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Set frequency high" - DYM low? The default is 60s, so 1s is lower?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High frequency => low time between repeats

(Similar to high frequency => short wavelength)

But if it's confusing then maybe bypass this by changing the comment to something like // Set frequency to 1s to make the test run as fast as possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of those pedantic things that annoy me; frequency is measured in Hz not seconds, this is an interval. However, most people will read this and understand that frequency represents the period here and is probably the least confusing naming option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A high frequency in Hz means a short distance between cycles.

A high frequency here means a short time between attempts.

+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\", \"frequency\": \"1s\"}},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
Expand All @@ -30,53 +29,47 @@ public void testFailureStateInteraction() throws Exception {
createReviewsIndex();
String transformId = "failure_pivot_1";
String dataFrameIndex = "failure_pivot_reviews";
createPivotReviewsTransform(transformId, dataFrameIndex, null);
deleteIndex(REVIEWS_INDEX_NAME); // trigger start failure due to index missing
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
startDataframeTransform(transformId, false);
// wait for our initial indexing to complete
waitForDataFrameCheckpoint(transformId);

// Deleting the only concrete index should make the checkpoint gathering fail
deleteIndex(REVIEWS_INDEX_NAME); // trigger start failure due to index missing

awaitState(transformId, DataFrameTransformTaskState.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);

// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("state.reason", fullState),
equalTo("task encountered irrecoverable failure: no such index [reviews]"));
assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started"));

// Verify that we cannot stop or start the transform when the task is in a failed state
ResponseException ex = expectThrows(ResponseException.class, () -> stopDataFrameTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason: [" +
"task encountered irrecoverable failure: no such index [reviews]]. Use force stop to stop the data frame transform."));
equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason [" +
"task encountered irrecoverable failure: no such index [reviews]. " +
"Use force stop to stop the data frame transform."));

ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to start data frame transform [failure_pivot_1] as it is in a failed state with failure: [" +
"task encountered irrecoverable failure: no such index [reviews]]. " +
"task encountered irrecoverable failure: no such index [reviews]. " +
"Use force start to restart data frame transform once error is resolved."));

// Correct the failure by creating the reviews index again
createReviewsIndex();
// Force start the data frame to indicate failure correction
startDataframeTransform(transformId, true);
// Wait for data to be indexed appropriately and refresh for search
waitForDataFrameCheckpoint(transformId);
refreshIndex(dataFrameIndex);
awaitState(transformId, DataFrameTransformTaskState.STARTED);

// Verify that we have started and that our reason is cleared
fullState = getDataFrameState(transformId);
assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue()));
assertThat(XContentMapValues.extractValue("state.task_state", fullState), equalTo("started"));
assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started"));
assertThat(XContentMapValues.extractValue("stats.search_failures", fullState), equalTo(1));

// get and check some users to verify we restarted
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);


stopDataFrameTransform(transformId, true);
deleteDataFrameTransform(transformId);
Expand All @@ -86,14 +79,6 @@ private void awaitState(String transformId, DataFrameTransformTaskState state) t
assertBusy(() -> {
String currentState = getDataFrameTaskState(transformId);
assertThat(state.value(), equalTo(currentState));
});
}

private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(expected, actual, 0.000001);
}, 60, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -88,8 +91,31 @@ protected void doExecute(Task task, StopDataFrameTransformAction.Request request
new PageParams(0, 10_000),
request.isAllowNoMatch(),
ActionListener.wrap(hitsAndIds -> {
if (request.isForce() == false) {
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is the main change. If you really want to test this code extract it to a static method and it is easily unit testable. MlTasksTests is a good reference for building PersistentTasksCustomMetaData

List<String> failedTasks = new ArrayList<>();
List<String> failedReasons = new ArrayList<>();
for (String transformId : hitsAndIds.v2()) {
PersistentTasksCustomMetaData.PersistentTask<?> dfTask = tasks.getTask(transformId);
if (dfTask.getState() instanceof DataFrameTransformState
&& ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) {
failedTasks.add(transformId);
failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason());
}
}
if (failedTasks.isEmpty() == false) {
String msg = failedTasks.size() == 1 ?
"Unable to stop data frame transform [" + request.getId()
+ "] as it is in a failed state with reason [" + failedReasons.get(0)
+ "]. Use force stop to stop the data frame transform." :
"Unable to stop data frame transforms. The following transforms are in a failed state " +
failedTasks + " with reasons " + failedReasons + ". Use force stop to stop the data frame transforms.";
listener.onFailure(new ElasticsearchStatusException(msg, RestStatus.CONFLICT));
return;
}
}
request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state()));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state));
super.doExecute(task, request, finalListener);
},
listener::onFailure
Expand All @@ -108,6 +134,8 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF
}

if (ids.contains(transformTask.getTransformId())) {
// This should not occur as we validate that none of the tasks are in a failed state earlier
// Keep this check in here for insurance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(
new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId()
Expand Down