Skip to content

Commit e5e8fa8

Browse files
authored
[ML] Fix simultaneous stop and force stop datafeed (#49367)
If a datafeed is stopped normally and force stopped at the same time then it is possible that the force stop removes the persistent task while the normal stop is performing actions. Currently this causes the normal stop to error, but since stopping a stopped datafeed is not an error this doesn't make sense. Instead the force stop should just take precedence. This is a followup to #49191 and should really have been included in the changes in that PR.
1 parent 039da97 commit e5e8fa8

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.cluster.ClusterState;
1717
import org.elasticsearch.cluster.node.DiscoveryNodes;
1818
import org.elasticsearch.cluster.service.ClusterService;
19-
import org.elasticsearch.common.Strings;
2019
import org.elasticsearch.common.inject.Inject;
2120
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2221
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -259,17 +258,18 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD
259258
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
260259
@Override
261260
public void onFailure(Exception e) {
262-
if ((e instanceof ResourceNotFoundException &&
263-
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) {
264-
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
261+
// We validated that the datafeed names supplied in the request existed when we started processing the action.
262+
// If the related task for one of them doesn't exist at this point then it must have been removed by a
263+
// simultaneous force stop request. This is not an error.
264+
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
265265
listener.onResponse(new StopDatafeedAction.Response(true));
266266
} else {
267267
listener.onFailure(e);
268268
}
269269
}
270270

271271
@Override
272-
protected void doRun() throws Exception {
272+
protected void doRun() {
273273
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
274274
listener.onResponse(new StopDatafeedAction.Response(true));
275275
}
@@ -343,7 +343,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req
343343
throw org.elasticsearch.ExceptionsHelper
344344
.convertToElastic(failedNodeExceptions.get(0));
345345
} else {
346-
// This can happen we the actual task in the node no longer exists,
346+
// This can happen when the actual task in the node no longer exists,
347347
// which means the datafeed(s) have already been stopped.
348348
return new StopDatafeedAction.Response(true);
349349
}

0 commit comments

Comments
 (0)