Skip to content

Commit 0feb2a6

Browse files
authored
ML: Fix error race condition on stop _all datafeeds and close _all jobs (#38113) (#38211)
* ML: Ignore when task is not found for _all * Addressing PR comments * Update TransportStopDatafeedAction.java
1 parent 55e0019 commit 0feb2a6

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.action;
77

88
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.ResourceNotFoundException;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.ActionListenerResponseHandler;
1112
import org.elasticsearch.action.FailedNodeException;
@@ -17,6 +18,7 @@
1718
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1819
import org.elasticsearch.cluster.node.DiscoveryNodes;
1920
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.Strings;
2022
import org.elasticsearch.common.inject.Inject;
2123
import org.elasticsearch.common.io.stream.StreamInput;
2224
import org.elasticsearch.common.settings.Settings;
@@ -291,7 +293,12 @@ protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAct
291293
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
292294
@Override
293295
public void onFailure(Exception e) {
294-
listener.onFailure(e);
296+
if (e instanceof ResourceNotFoundException && Strings.isAllOrWildcard(new String[]{request.getJobId()})) {
297+
jobTask.closeJob("close job (api)");
298+
listener.onResponse(new CloseJobAction.Response(true));
299+
} else {
300+
listener.onFailure(e);
301+
}
295302
}
296303

297304
@Override
@@ -356,7 +363,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
356363
@Override
357364
public void onFailure(Exception e) {
358365
final int slot = counter.incrementAndGet();
359-
failures.set(slot - 1, e);
366+
if ((e instanceof ResourceNotFoundException &&
367+
Strings.isAllOrWildcard(new String[]{request.getJobId()})) == false) {
368+
failures.set(slot - 1, e);
369+
}
360370
if (slot == numberOfJobs) {
361371
sendResponseOrFailure(request.getJobId(), listener, failures);
362372
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1818
import org.elasticsearch.cluster.node.DiscoveryNodes;
1919
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.common.Strings;
2021
import org.elasticsearch.common.inject.Inject;
2122
import org.elasticsearch.common.io.stream.StreamInput;
2223
import org.elasticsearch.common.settings.Settings;
@@ -193,7 +194,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persisten
193194
@Override
194195
public void onFailure(Exception e) {
195196
final int slot = counter.incrementAndGet();
196-
failures.set(slot - 1, e);
197+
if ((e instanceof ResourceNotFoundException &&
198+
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
199+
failures.set(slot - 1, e);
200+
}
197201
if (slot == startedDatafeeds.size()) {
198202
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
199203
}
@@ -221,7 +225,13 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD
221225
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
222226
@Override
223227
public void onFailure(Exception e) {
224-
listener.onFailure(e);
228+
if ((e instanceof ResourceNotFoundException &&
229+
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) {
230+
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
231+
listener.onResponse(new StopDatafeedAction.Response(true));
232+
} else {
233+
listener.onFailure(e);
234+
}
225235
}
226236

227237
@Override

0 commit comments

Comments
 (0)