Skip to content

Commit 70c099a

Browse files
authored
[ML][Data Frame] forcing that no ptask => STOPPED state (#42800)
* [ML][Data Frame] forcing that no ptask => STOPPED state * Addressing side-effect, early exit for stop when stopped
1 parent 2d23399 commit 70c099a

File tree

4 files changed

+24
-4
lines changed

4 files changed

+24
-4
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,8 @@ private IndexerState finishAndSetState() {
284284
AtomicBoolean callOnStop = new AtomicBoolean(false);
285285
AtomicBoolean callOnAbort = new AtomicBoolean(false);
286286
IndexerState updatedState = state.updateAndGet(prev -> {
287+
callOnAbort.set(false);
288+
callOnStop.set(false);
287289
switch (prev) {
288290
case INDEXING:
289291
// ready for another job

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
2727
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
2828
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
29+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
2930
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
31+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
32+
import org.elasticsearch.xpack.core.indexing.IndexerState;
3033
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
3134
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
3235
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
@@ -136,7 +139,21 @@ private void collectStatsForTransformsWithoutTasks(Request request,
136139
ActionListener<List<DataFrameTransformStateAndStats>> searchStatsListener = ActionListener.wrap(
137140
stats -> {
138141
List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats();
139-
allStateAndStats.addAll(stats);
142+
// If the persistent task does NOT exist, it is STOPPED
143+
// There is a potential race condition where the saved document does not actually have a STOPPED state
144+
// as the task is cancelled before we persist state.
145+
stats.forEach(stat ->
146+
allStateAndStats.add(new DataFrameTransformStateAndStats(
147+
stat.getId(),
148+
new DataFrameTransformState(DataFrameTransformTaskState.STOPPED,
149+
IndexerState.STOPPED,
150+
stat.getTransformState().getPosition(),
151+
stat.getTransformState().getCheckpoint(),
152+
stat.getTransformState().getReason(),
153+
stat.getTransformState().getProgress()),
154+
stat.getTransformStats(),
155+
stat.getCheckpointingInfo()))
156+
);
140157
transformsWithoutTasks.removeAll(
141158
stats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toSet()));
142159

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,10 @@ public synchronized void stop() {
237237
return;
238238
}
239239

240+
if (getIndexer().getState() == IndexerState.STOPPED) {
241+
return;
242+
}
243+
240244
IndexerState state = getIndexer().stop();
241245
if (state == IndexerState.STOPPED) {
242246
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());

x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,6 @@ teardown:
9090
- match: { airline-data-by-airline-start-stop.mappings: {} }
9191
---
9292
"Test start/stop/start transform":
93-
- skip:
94-
reason: "https://github.com/elastic/elasticsearch/issues/42650"
95-
version: "all"
9693
- do:
9794
data_frame.start_data_frame_transform:
9895
transform_id: "airline-transform-start-stop"

0 commit comments

Comments
 (0)