Skip to content

Commit be381b4

Browse files
authored
ML: better handle task state race condition (#38040)
1 parent 8e95780 commit be381b4

File tree

1 file changed

+26
-2
lines changed

1 file changed

+26
-2
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
*/
66
package org.elasticsearch.xpack.ml.action;
77

8+
import org.elasticsearch.ElasticsearchException;
89
import org.elasticsearch.ElasticsearchStatusException;
910
import org.elasticsearch.ElasticsearchTimeoutException;
11+
import org.elasticsearch.ResourceNotFoundException;
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.action.support.ActionFilters;
1214
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -33,11 +35,13 @@
3335
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
3436
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
3537

38+
import java.util.Comparator;
3639
import java.util.List;
3740
import java.util.Set;
3841
import java.util.concurrent.atomic.AtomicBoolean;
3942
import java.util.stream.Collectors;
4043

44+
import static org.elasticsearch.ExceptionsHelper.rethrowAndSuppress;
4145
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
4246
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
4347
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
@@ -119,9 +123,20 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat
119123
.cluster()
120124
.prepareListTasks()
121125
.setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]")
126+
// There is a chance that we failed un-allocating a task due to allocation_id being changed
127+
// This call will timeout in that case and return an error
122128
.setWaitForCompletion(true)
123129
.setTimeout(request.timeout()).execute(ActionListener.wrap(
124-
r -> wrappedListener.onResponse(new AcknowledgedResponse(true)),
130+
r -> {
131+
try {
132+
// Handle potential node timeouts,
133+
// these should be considered failures as tasks as still potentially executing
134+
rethrowAndSuppress(r.getNodeFailures());
135+
wrappedListener.onResponse(new AcknowledgedResponse(true));
136+
} catch (ElasticsearchException ex) {
137+
wrappedListener.onFailure(ex);
138+
}
139+
},
125140
wrappedListener::onFailure));
126141
},
127142
wrappedListener::onFailure
@@ -243,10 +258,19 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe
243258
.stream()
244259
.filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) ||
245260
persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME)))
261+
// We want to always have the same ordering of which tasks we un-allocate first.
262+
// However, the order in which the distributed tasks handle the un-allocation event is not guaranteed.
263+
.sorted(Comparator.comparing(PersistentTask::getTaskName))
246264
.collect(Collectors.toList());
247265

248266
TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
249-
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);
267+
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
268+
r -> true,
269+
// Another process could modify tasks and thus we cannot find them via the allocation_id and name
270+
// If the task was removed from the node, all is well
271+
// We handle the case of allocation_id changing later in this transport class by timing out waiting for task completion
272+
// Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise.
273+
ex -> ex instanceof ResourceNotFoundException == false);
250274

251275
for (PersistentTask<?> task : datafeedAndJobTasks) {
252276
chainTaskExecutor.add(

0 commit comments

Comments
 (0)