Skip to content

[ML][Data Frame] fixing _start?force=true bug #45660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request re
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
//TODO fix bug as .start where it was failed could result in a null current checkpoint?
transformTask.start(null, request.isForce(), listener);
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.DataFrame;
Expand Down Expand Up @@ -120,13 +119,14 @@ static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
final String transformId = params.getId();
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state;
// If the transform is failed then the Persistent Task Service will
// try to restart it on a node restart. Exiting here leaves the
// transform in the failed state and it must be force closed.
if (transformPTaskState != null && transformPTaskState.getTaskState() == DataFrameTransformTaskState.FAILED) {
return;
}
// NOTE: DataFrameTransformPersistentTasksExecutor#createTask pulls in the stored task state from the ClusterState when the object
// is created. DataFrameTransformTask#ctor takes into account setting the task as failed if that is passed in with the
// persisted state.
// DataFrameTransformPersistentTasksExecutor#startTask will fail as DataFrameTransformTask#start, when force == false, will return
// a failure indicating that a failed task cannot be started.
//
// We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again
// later if they want.

final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
Expand Down Expand Up @@ -298,6 +298,7 @@ private void startTask(DataFrameTransformTask buildTask,
Long previousCheckpoint,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
buildTask.initializeIndexer(indexerBuilder);
// DataFrameTransformTask#start will fail if the task state is FAILED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public long getInProgressCheckpoint() {
}
}

public void setTaskStateStopped() {
public synchronized void setTaskStateStopped() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this synchronized as start stop and markAsFailed are all synchronized and we do not want task state flipping between states while we are attempting to transition it.

doSaveState calls setTaskStateStopped and since synchronized utilize reentrant locks, stop calling doSaveState directly in certain scenarios should not be a problem.

taskState.set(DataFrameTransformTaskState.STOPPED);
}

Expand All @@ -256,8 +256,16 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
return;
}
if (getIndexer() == null) {
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
getTransformId()));
// If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets
// fully initialized.
// If we are NOT failed, then we can assume that `start` was just called early in the process.
String msg = taskState.get() == DataFrameTransformTaskState.FAILED ?
"It failed during the initialization process; force stop to allow reinitialization." :
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we failed before even being fully initialized, there was probably something REALLY wrong with the cluster at the time. We should indicate to the user that _stop?force=true needs to be called before attempting to start again.

"Try again later.";
listener.onFailure(new ElasticsearchStatusException("Task for transform [{}] not fully initialized. {}",
RestStatus.CONFLICT,
getTransformId(),
msg));
return;
}
final IndexerState newState = getIndexer().start();
Expand Down Expand Up @@ -409,6 +417,13 @@ void persistStateToClusterState(DataFrameTransformState state,
}

synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
// If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
// flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason to move this up earlier in the stack is to prevent needless checks. Additionally, we don't want to accidentally return weird error messages as a task could potentially (however rarely) be FAILED but the indexer be in STOPPED or STOPPING state.

logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
listener.onResponse(null);
return;
}
// If the indexer is `STOPPING` this means that `DataFrameTransformTask#stop` was called previously, but something caused
// the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops,
// it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
Expand All @@ -425,26 +440,13 @@ synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
listener.onResponse(null);
return;
}
// If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
// flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
listener.onResponse(null);
return;
}
auditor.error(transform.getId(), reason);
// We should not keep retrying. Either the task will be stopped, or started
// If it is started again, it is registered again.
deregisterSchedulerJob();
DataFrameTransformState newState = new DataFrameTransformState(
DataFrameTransformTaskState.FAILED,
getIndexer() == null ? initialIndexerState : getIndexer().getState(),
getIndexer() == null ? initialPosition : getIndexer().getPosition(),
currentCheckpoint.get(),
reason,
getIndexer() == null ? null : getIndexer().getProgress());
taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason);
DataFrameTransformState newState = getState();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is safe to call now like this since setTaskStateStopped is synchronized

// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
// This keeps track of STARTED, FAILED, STOPPED
// This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that
Expand Down