Skip to content

Commit 1f1868a

Browse files
authored
[ML][Data Frame] pull state and states for indexer from index (#42856) (#42935)
* [ML][Data Frame] pull state and states for indexer from index * Update DataFrameTransformTask.java
1 parent 02e6acf commit 1f1868a

File tree

2 files changed

+20
-16
lines changed

2 files changed

+20
-16
lines changed

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

+12-14
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
111111
new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
112112
.setAuditor(auditor)
113113
.setClient(client)
114-
.setIndexerState(currentIndexerState(transformPTaskState))
115-
// If the transform persistent task state is `null` that means this is a "first run".
116-
// If we have state then the task has relocated from another node in which case this
117-
// state is preferred
118-
.setInitialPosition(transformPTaskState == null ? null : transformPTaskState.getPosition())
119-
.setProgress(transformPTaskState == null ? null : transformPTaskState.getProgress())
120114
.setTransformsCheckpointService(dataFrameTransformsCheckpointService)
121115
.setTransformsConfigManager(transformsConfigManager);
122116

@@ -132,18 +126,22 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
132126
// Schedule execution regardless
133127
ActionListener<DataFrameTransformStateAndStats> transformStatsActionListener = ActionListener.wrap(
134128
stateAndStats -> {
135-
indexerBuilder.setInitialStats(stateAndStats.getTransformStats());
136-
if (transformPTaskState == null) { // prefer the persistent task state
137-
indexerBuilder.setInitialPosition(stateAndStats.getTransformState().getPosition());
138-
indexerBuilder.setProgress(stateAndStats.getTransformState().getProgress());
139-
}
140-
141-
final Long checkpoint = previousCheckpoint != null ? previousCheckpoint : stateAndStats.getTransformState().getCheckpoint();
129+
logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString());
130+
indexerBuilder.setInitialStats(stateAndStats.getTransformStats())
131+
.setInitialPosition(stateAndStats.getTransformState().getPosition())
132+
.setProgress(stateAndStats.getTransformState().getProgress())
133+
.setIndexerState(currentIndexerState(stateAndStats.getTransformState()));
134+
logger.info("[{}] Loading existing state: [{}], position [{}]",
135+
transformId,
136+
stateAndStats.getTransformState(),
137+
stateAndStats.getTransformState().getPosition());
138+
139+
final Long checkpoint = stateAndStats.getTransformState().getCheckpoint();
142140
startTask(buildTask, indexerBuilder, checkpoint, startTaskListener);
143141
},
144142
error -> {
145143
if (error instanceof ResourceNotFoundException == false) {
146-
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
144+
logger.warn("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
147145
}
148146
startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener);
149147
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,10 @@ public DataFrameTransformTask(long id, String type, String action, TaskId parent
8989
String initialReason = null;
9090
long initialGeneration = 0;
9191
Map<String, Object> initialPosition = null;
92-
logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null);
9392
if (state != null) {
9493
initialTaskState = state.getTaskState();
9594
initialReason = state.getReason();
9695
final IndexerState existingState = state.getIndexerState();
97-
logger.info("[{}] Loading existing state: [{}], position [{}]", transform.getId(), existingState, state.getPosition());
9896
if (existingState.equals(IndexerState.INDEXING)) {
9997
// reset to started as no indexer is running
10098
initialState = IndexerState.STARTED;
@@ -213,6 +211,10 @@ public synchronized void start(Long startingCheckpoint, ActionListener<Response>
213211
getIndexer().getProgress());
214212

215213
logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
214+
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
215+
// This keeps track of STARTED, FAILED, STOPPED
216+
// This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
217+
// we could not read the previous state information from said index.
216218
persistStateToClusterState(state, ActionListener.wrap(
217219
task -> {
218220
auditor.info(transform.getId(),
@@ -301,6 +303,10 @@ synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
301303
taskState.set(DataFrameTransformTaskState.FAILED);
302304
stateReason.set(reason);
303305
auditor.error(transform.getId(), reason);
306+
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
307+
// This keeps track of STARTED, FAILED, STOPPED
308+
// This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
309+
// we could not read the previous state information from said index.
304310
persistStateToClusterState(getState(), ActionListener.wrap(
305311
r -> listener.onResponse(null),
306312
listener::onFailure

0 commit comments

Comments
 (0)