Skip to content

Commit ce6106f

Browse files
authored
[ML][Data Frame] fixing _start?force=true bug (#45660)
* [ML][Data Frame] fixing _start?force=true bug * removing unused import * removing old TODO
1 parent 0e4a837 commit ce6106f

File tree

3 files changed

+28
-26
lines changed

3 files changed

+28
-26
lines changed

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java

-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request re
5959
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
6060
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
6161
if (transformTask.getTransformId().equals(request.getId())) {
62-
//TODO fix bug as .start where it was failed could result in a null current checkpoint?
6362
transformTask.start(null, request.isForce(), listener);
6463
} else {
6564
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
3636
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
3737
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc;
38-
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
3938
import org.elasticsearch.xpack.core.indexing.IndexerState;
4039
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
4140
import org.elasticsearch.xpack.dataframe.DataFrame;
@@ -120,13 +119,14 @@ static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat
120119
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
121120
final String transformId = params.getId();
122121
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
123-
final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state;
124-
// If the transform is failed then the Persistent Task Service will
125-
// try to restart it on a node restart. Exiting here leaves the
126-
// transform in the failed state and it must be force closed.
127-
if (transformPTaskState != null && transformPTaskState.getTaskState() == DataFrameTransformTaskState.FAILED) {
128-
return;
129-
}
122+
// NOTE: DataFrameTransformPersistentTasksExecutor#createTask pulls in the stored task state from the ClusterState when the object
123+
// is created. DataFrameTransformTask#ctor takes into account setting the task as failed if that is passed in with the
124+
// persisted state.
125+
// DataFrameTransformPersistentTasksExecutor#startTask will fail as DataFrameTransformTask#start, when force == false, will return
126+
// a failure indicating that a failed task cannot be started.
127+
//
128+
// We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again
129+
// later if they want.
130130

131131
final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
132132
new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
@@ -298,6 +298,7 @@ private void startTask(DataFrameTransformTask buildTask,
298298
Long previousCheckpoint,
299299
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
300300
buildTask.initializeIndexer(indexerBuilder);
301+
// DataFrameTransformTask#start will fail if the task state is FAILED
301302
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
302303
}
303304

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

+19-17
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public long getInProgressCheckpoint() {
235235
}
236236
}
237237

238-
public void setTaskStateStopped() {
238+
public synchronized void setTaskStateStopped() {
239239
taskState.set(DataFrameTransformTaskState.STOPPED);
240240
}
241241

@@ -256,8 +256,16 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
256256
return;
257257
}
258258
if (getIndexer() == null) {
259-
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
260-
getTransformId()));
259+
// If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets
260+
// fully initialized.
261+
// If we are NOT failed, then we can assume that `start` was just called early in the process.
262+
String msg = taskState.get() == DataFrameTransformTaskState.FAILED ?
263+
"It failed during the initialization process; force stop to allow reinitialization." :
264+
"Try again later.";
265+
listener.onFailure(new ElasticsearchStatusException("Task for transform [{}] not fully initialized. {}",
266+
RestStatus.CONFLICT,
267+
getTransformId(),
268+
msg));
261269
return;
262270
}
263271
final IndexerState newState = getIndexer().start();
@@ -409,6 +417,13 @@ void persistStateToClusterState(DataFrameTransformState state,
409417
}
410418

411419
synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
420+
// If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
421+
// flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
422+
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
423+
logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
424+
listener.onResponse(null);
425+
return;
426+
}
412427
// If the indexer is `STOPPING` this means that `DataFrameTransformTask#stop` was called previously, but something caused
413428
// the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops,
414429
// it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
@@ -425,26 +440,13 @@ synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
425440
listener.onResponse(null);
426441
return;
427442
}
428-
// If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
429-
// flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
430-
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
431-
logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
432-
listener.onResponse(null);
433-
return;
434-
}
435443
auditor.error(transform.getId(), reason);
436444
// We should not keep retrying. Either the task will be stopped, or started
437445
// If it is started again, it is registered again.
438446
deregisterSchedulerJob();
439-
DataFrameTransformState newState = new DataFrameTransformState(
440-
DataFrameTransformTaskState.FAILED,
441-
getIndexer() == null ? initialIndexerState : getIndexer().getState(),
442-
getIndexer() == null ? initialPosition : getIndexer().getPosition(),
443-
currentCheckpoint.get(),
444-
reason,
445-
getIndexer() == null ? null : getIndexer().getProgress());
446447
taskState.set(DataFrameTransformTaskState.FAILED);
447448
stateReason.set(reason);
449+
DataFrameTransformState newState = getState();
448450
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
449451
// This keeps track of STARTED, FAILED, STOPPED
450452
// This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that

0 commit comments

Comments
 (0)