Skip to content

Commit 5db3050

Browse files
authored
ML: Fix error race condition on stop _all datafeeds and close _all jobs (#38113)
* ML: Ignore when task is not found for _all * Addressing PR comments * Update TransportStopDatafeedAction.java
1 parent f5f3cb8 commit 5db3050

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.action;
77

88
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.ResourceNotFoundException;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.ActionListenerResponseHandler;
1112
import org.elasticsearch.action.FailedNodeException;
@@ -16,6 +17,7 @@
1617
import org.elasticsearch.cluster.ClusterState;
1718
import org.elasticsearch.cluster.node.DiscoveryNodes;
1819
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.common.Strings;
1921
import org.elasticsearch.common.inject.Inject;
2022
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2123
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -272,7 +274,12 @@ protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAct
272274
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
273275
@Override
274276
public void onFailure(Exception e) {
275-
listener.onFailure(e);
277+
if (e instanceof ResourceNotFoundException && Strings.isAllOrWildcard(new String[]{request.getJobId()})) {
278+
jobTask.closeJob("close job (api)");
279+
listener.onResponse(new CloseJobAction.Response(true));
280+
} else {
281+
listener.onFailure(e);
282+
}
276283
}
277284

278285
@Override
@@ -332,7 +339,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
332339
@Override
333340
public void onFailure(Exception e) {
334341
final int slot = counter.incrementAndGet();
335-
failures.set(slot - 1, e);
342+
if ((e instanceof ResourceNotFoundException &&
343+
Strings.isAllOrWildcard(new String[]{request.getJobId()})) == false) {
344+
failures.set(slot - 1, e);
345+
}
336346
if (slot == numberOfJobs) {
337347
sendResponseOrFailure(request.getJobId(), listener, failures);
338348
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.ClusterState;
1717
import org.elasticsearch.cluster.node.DiscoveryNodes;
1818
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.common.Strings;
1920
import org.elasticsearch.common.inject.Inject;
2021
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2122
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -187,7 +188,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persisten
187188
@Override
188189
public void onFailure(Exception e) {
189190
final int slot = counter.incrementAndGet();
190-
failures.set(slot - 1, e);
191+
if ((e instanceof ResourceNotFoundException &&
192+
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
193+
failures.set(slot - 1, e);
194+
}
191195
if (slot == startedDatafeeds.size()) {
192196
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
193197
}
@@ -215,7 +219,13 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD
215219
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
216220
@Override
217221
public void onFailure(Exception e) {
218-
listener.onFailure(e);
222+
if ((e instanceof ResourceNotFoundException &&
223+
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) {
224+
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
225+
listener.onResponse(new StopDatafeedAction.Response(true));
226+
} else {
227+
listener.onFailure(e);
228+
}
219229
}
220230

221231
@Override

0 commit comments

Comments
 (0)